diff --git a/ee/connectors/consumer_pool.py b/ee/connectors/consumer_pool.py index 5a2dbf34e..f33dbbc37 100644 --- a/ee/connectors/consumer_pool.py +++ b/ee/connectors/consumer_pool.py @@ -1,7 +1,8 @@ from decouple import config, Csv -import asyncio +import signal +# import asyncio from db.api import DBConnection -from utils import pg_client +# from utils import pg_client from utils.worker import WorkerPool @@ -10,7 +11,7 @@ def main(): database_api = DBConnection(DATABASE) allowed_projects = config('PROJECT_IDS', default=None, cast=Csv(int)) - w_pool = WorkerPool(n_workers=60, + w_pool = WorkerPool(n_workers=config('OR_EE_CONNECTOR_WORKER_COUNT', cast=int, default=60), project_filter=allowed_projects) try: w_pool.load_checkpoint(database_api) @@ -24,6 +25,7 @@ def main(): if __name__ == '__main__': - asyncio.run(pg_client.init()) + # asyncio.run(pg_client.init()) main() + raise Exception('Script terminated') diff --git a/ee/connectors/msgcodec/msgcodec.pyx b/ee/connectors/msgcodec/msgcodec.pyx index ff097f7d8..a9f47af3c 100644 --- a/ee/connectors/msgcodec/msgcodec.pyx +++ b/ee/connectors/msgcodec/msgcodec.pyx @@ -193,51 +193,51 @@ cdef class MessageCodec: if message_id == 0: return Timestamp( - timestamp=self.read_uint(reader) + timestamp=MessageCodec.read_uint(reader) ) if message_id == 1: return SessionStart( - timestamp=self.read_uint(reader), - project_id=self.read_uint(reader), - tracker_version=self.read_string(reader), - rev_id=self.read_string(reader), - user_uuid=self.read_string(reader), - user_agent=self.read_string(reader), - user_os=self.read_string(reader), - user_os_version=self.read_string(reader), - user_browser=self.read_string(reader), - user_browser_version=self.read_string(reader), - user_device=self.read_string(reader), - user_device_type=self.read_string(reader), - user_device_memory_size=self.read_uint(reader), - user_device_heap_size=self.read_uint(reader), - user_country=self.read_string(reader), - user_id=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + project_id=MessageCodec.read_uint(reader), + tracker_version=MessageCodec.read_string(reader), + rev_id=MessageCodec.read_string(reader), + user_uuid=MessageCodec.read_string(reader), + user_agent=MessageCodec.read_string(reader), + user_os=MessageCodec.read_string(reader), + user_os_version=MessageCodec.read_string(reader), + user_browser=MessageCodec.read_string(reader), + user_browser_version=MessageCodec.read_string(reader), + user_device=MessageCodec.read_string(reader), + user_device_type=MessageCodec.read_string(reader), + user_device_memory_size=MessageCodec.read_uint(reader), + user_device_heap_size=MessageCodec.read_uint(reader), + user_country=MessageCodec.read_string(reader), + user_id=MessageCodec.read_string(reader) ) if message_id == 3: return SessionEndDeprecated( - timestamp=self.read_uint(reader) + timestamp=MessageCodec.read_uint(reader) ) if message_id == 4: return SetPageLocation( - url=self.read_string(reader), - referrer=self.read_string(reader), - navigation_start=self.read_uint(reader) + url=MessageCodec.read_string(reader), + referrer=MessageCodec.read_string(reader), + navigation_start=MessageCodec.read_uint(reader) ) if message_id == 5: return SetViewportSize( - width=self.read_uint(reader), - height=self.read_uint(reader) + width=MessageCodec.read_uint(reader), + height=MessageCodec.read_uint(reader) ) if message_id == 6: return SetViewportScroll( - x=self.read_int(reader), - y=self.read_int(reader) + x=MessageCodec.read_int(reader), + y=MessageCodec.read_int(reader) ) if message_id == 7: @@ -247,515 +247,515 @@ cdef class MessageCodec: if message_id == 8: return CreateElementNode( - id=self.read_uint(reader), - parent_id=self.read_uint(reader), - index=self.read_uint(reader), - tag=self.read_string(reader), - svg=self.read_boolean(reader) + id=MessageCodec.read_uint(reader), + parent_id=MessageCodec.read_uint(reader), + index=MessageCodec.read_uint(reader), + tag=MessageCodec.read_string(reader), + svg=MessageCodec.read_boolean(reader) ) if message_id == 9: return CreateTextNode( - id=self.read_uint(reader), - parent_id=self.read_uint(reader), - index=self.read_uint(reader) + id=MessageCodec.read_uint(reader), + parent_id=MessageCodec.read_uint(reader), + index=MessageCodec.read_uint(reader) ) if message_id == 10: return MoveNode( - id=self.read_uint(reader), - parent_id=self.read_uint(reader), - index=self.read_uint(reader) + id=MessageCodec.read_uint(reader), + parent_id=MessageCodec.read_uint(reader), + index=MessageCodec.read_uint(reader) ) if message_id == 11: return RemoveNode( - id=self.read_uint(reader) + id=MessageCodec.read_uint(reader) ) if message_id == 12: return SetNodeAttribute( - id=self.read_uint(reader), - name=self.read_string(reader), - value=self.read_string(reader) + id=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) ) if message_id == 13: return RemoveNodeAttribute( - id=self.read_uint(reader), - name=self.read_string(reader) + id=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader) ) if message_id == 14: return SetNodeData( - id=self.read_uint(reader), - data=self.read_string(reader) + id=MessageCodec.read_uint(reader), + data=MessageCodec.read_string(reader) ) if message_id == 15: return SetCSSData( - id=self.read_uint(reader), - data=self.read_string(reader) + id=MessageCodec.read_uint(reader), + data=MessageCodec.read_string(reader) ) if message_id == 16: return SetNodeScroll( - id=self.read_uint(reader), - x=self.read_int(reader), - y=self.read_int(reader) + id=MessageCodec.read_uint(reader), + x=MessageCodec.read_int(reader), + y=MessageCodec.read_int(reader) ) if message_id == 17: return SetInputTarget( - id=self.read_uint(reader), - label=self.read_string(reader) + id=MessageCodec.read_uint(reader), + label=MessageCodec.read_string(reader) ) if message_id == 18: return SetInputValue( - id=self.read_uint(reader), - value=self.read_string(reader), - mask=self.read_int(reader) + id=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader), + mask=MessageCodec.read_int(reader) ) if message_id == 19: return SetInputChecked( - id=self.read_uint(reader), - checked=self.read_boolean(reader) + id=MessageCodec.read_uint(reader), + checked=MessageCodec.read_boolean(reader) ) if message_id == 20: return MouseMove( - x=self.read_uint(reader), - y=self.read_uint(reader) + x=MessageCodec.read_uint(reader), + y=MessageCodec.read_uint(reader) ) if message_id == 21: return NetworkRequestDeprecated( - type=self.read_string(reader), - method=self.read_string(reader), - url=self.read_string(reader), - request=self.read_string(reader), - response=self.read_string(reader), - status=self.read_uint(reader), - timestamp=self.read_uint(reader), - duration=self.read_uint(reader) + type=MessageCodec.read_string(reader), + method=MessageCodec.read_string(reader), + url=MessageCodec.read_string(reader), + request=MessageCodec.read_string(reader), + response=MessageCodec.read_string(reader), + status=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader) ) if message_id == 22: return ConsoleLog( - level=self.read_string(reader), - value=self.read_string(reader) + level=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) ) if message_id == 23: return PageLoadTiming( - request_start=self.read_uint(reader), - response_start=self.read_uint(reader), - response_end=self.read_uint(reader), - dom_content_loaded_event_start=self.read_uint(reader), - dom_content_loaded_event_end=self.read_uint(reader), - load_event_start=self.read_uint(reader), - load_event_end=self.read_uint(reader), - first_paint=self.read_uint(reader), - first_contentful_paint=self.read_uint(reader) + request_start=MessageCodec.read_uint(reader), + response_start=MessageCodec.read_uint(reader), + response_end=MessageCodec.read_uint(reader), + dom_content_loaded_event_start=MessageCodec.read_uint(reader), + dom_content_loaded_event_end=MessageCodec.read_uint(reader), + load_event_start=MessageCodec.read_uint(reader), + load_event_end=MessageCodec.read_uint(reader), + first_paint=MessageCodec.read_uint(reader), + first_contentful_paint=MessageCodec.read_uint(reader) ) if message_id == 24: return PageRenderTiming( - speed_index=self.read_uint(reader), - visually_complete=self.read_uint(reader), - time_to_interactive=self.read_uint(reader) + speed_index=MessageCodec.read_uint(reader), + visually_complete=MessageCodec.read_uint(reader), + time_to_interactive=MessageCodec.read_uint(reader) ) if message_id == 25: return JSExceptionDeprecated( - name=self.read_string(reader), - message=self.read_string(reader), - payload=self.read_string(reader) + name=MessageCodec.read_string(reader), + message=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) ) if message_id == 26: return IntegrationEvent( - timestamp=self.read_uint(reader), - source=self.read_string(reader), - name=self.read_string(reader), - message=self.read_string(reader), - payload=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + source=MessageCodec.read_string(reader), + name=MessageCodec.read_string(reader), + message=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) ) if message_id == 27: return CustomEvent( - name=self.read_string(reader), - payload=self.read_string(reader) + name=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) ) if message_id == 28: return UserID( - id=self.read_string(reader) + id=MessageCodec.read_string(reader) ) if message_id == 29: return UserAnonymousID( - id=self.read_string(reader) + id=MessageCodec.read_string(reader) ) if message_id == 30: return Metadata( - key=self.read_string(reader), - value=self.read_string(reader) + key=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) ) if message_id == 31: return PageEvent( - message_id=self.read_uint(reader), - timestamp=self.read_uint(reader), - url=self.read_string(reader), - referrer=self.read_string(reader), - loaded=self.read_boolean(reader), - request_start=self.read_uint(reader), - response_start=self.read_uint(reader), - response_end=self.read_uint(reader), - dom_content_loaded_event_start=self.read_uint(reader), - dom_content_loaded_event_end=self.read_uint(reader), - load_event_start=self.read_uint(reader), - load_event_end=self.read_uint(reader), - first_paint=self.read_uint(reader), - first_contentful_paint=self.read_uint(reader), - speed_index=self.read_uint(reader), - visually_complete=self.read_uint(reader), - time_to_interactive=self.read_uint(reader) + message_id=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + url=MessageCodec.read_string(reader), + referrer=MessageCodec.read_string(reader), + loaded=MessageCodec.read_boolean(reader), + request_start=MessageCodec.read_uint(reader), + response_start=MessageCodec.read_uint(reader), + response_end=MessageCodec.read_uint(reader), + dom_content_loaded_event_start=MessageCodec.read_uint(reader), + dom_content_loaded_event_end=MessageCodec.read_uint(reader), + load_event_start=MessageCodec.read_uint(reader), + load_event_end=MessageCodec.read_uint(reader), + first_paint=MessageCodec.read_uint(reader), + first_contentful_paint=MessageCodec.read_uint(reader), + speed_index=MessageCodec.read_uint(reader), + visually_complete=MessageCodec.read_uint(reader), + time_to_interactive=MessageCodec.read_uint(reader) ) if message_id == 32: return InputEvent( - message_id=self.read_uint(reader), - timestamp=self.read_uint(reader), - value=self.read_string(reader), - value_masked=self.read_boolean(reader), - label=self.read_string(reader) + message_id=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader), + value_masked=MessageCodec.read_boolean(reader), + label=MessageCodec.read_string(reader) ) if message_id == 37: return CSSInsertRule( - id=self.read_uint(reader), - rule=self.read_string(reader), - index=self.read_uint(reader) + id=MessageCodec.read_uint(reader), + rule=MessageCodec.read_string(reader), + index=MessageCodec.read_uint(reader) ) if message_id == 38: return CSSDeleteRule( - id=self.read_uint(reader), - index=self.read_uint(reader) + id=MessageCodec.read_uint(reader), + index=MessageCodec.read_uint(reader) ) if message_id == 39: return Fetch( - method=self.read_string(reader), - url=self.read_string(reader), - request=self.read_string(reader), - response=self.read_string(reader), - status=self.read_uint(reader), - timestamp=self.read_uint(reader), - duration=self.read_uint(reader) + method=MessageCodec.read_string(reader), + url=MessageCodec.read_string(reader), + request=MessageCodec.read_string(reader), + response=MessageCodec.read_string(reader), + status=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader) ) if message_id == 40: return Profiler( - name=self.read_string(reader), - duration=self.read_uint(reader), - args=self.read_string(reader), - result=self.read_string(reader) + name=MessageCodec.read_string(reader), + duration=MessageCodec.read_uint(reader), + args=MessageCodec.read_string(reader), + result=MessageCodec.read_string(reader) ) if message_id == 41: return OTable( - key=self.read_string(reader), - value=self.read_string(reader) + key=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) ) if message_id == 42: return StateAction( - type=self.read_string(reader) + type=MessageCodec.read_string(reader) ) if message_id == 44: return Redux( - action=self.read_string(reader), - state=self.read_string(reader), - duration=self.read_uint(reader) + action=MessageCodec.read_string(reader), + state=MessageCodec.read_string(reader), + duration=MessageCodec.read_uint(reader) ) if message_id == 45: return Vuex( - mutation=self.read_string(reader), - state=self.read_string(reader) + mutation=MessageCodec.read_string(reader), + state=MessageCodec.read_string(reader) ) if message_id == 46: return MobX( - type=self.read_string(reader), - payload=self.read_string(reader) + type=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) ) if message_id == 47: return NgRx( - action=self.read_string(reader), - state=self.read_string(reader), - duration=self.read_uint(reader) + action=MessageCodec.read_string(reader), + state=MessageCodec.read_string(reader), + duration=MessageCodec.read_uint(reader) ) if message_id == 48: return GraphQL( - operation_kind=self.read_string(reader), - operation_name=self.read_string(reader), - variables=self.read_string(reader), - response=self.read_string(reader) + operation_kind=MessageCodec.read_string(reader), + operation_name=MessageCodec.read_string(reader), + variables=MessageCodec.read_string(reader), + response=MessageCodec.read_string(reader) ) if message_id == 49: return PerformanceTrack( - frames=self.read_int(reader), - ticks=self.read_int(reader), - total_js_heap_size=self.read_uint(reader), - used_js_heap_size=self.read_uint(reader) + frames=MessageCodec.read_int(reader), + ticks=MessageCodec.read_int(reader), + total_js_heap_size=MessageCodec.read_uint(reader), + used_js_heap_size=MessageCodec.read_uint(reader) ) if message_id == 50: return StringDict( - key=self.read_uint(reader), - value=self.read_string(reader) + key=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader) ) if message_id == 51: return SetNodeAttributeDict( - id=self.read_uint(reader), - name_key=self.read_uint(reader), - value_key=self.read_uint(reader) + id=MessageCodec.read_uint(reader), + name_key=MessageCodec.read_uint(reader), + value_key=MessageCodec.read_uint(reader) ) if message_id == 53: return ResourceTimingDeprecated( - timestamp=self.read_uint(reader), - duration=self.read_uint(reader), - ttfb=self.read_uint(reader), - header_size=self.read_uint(reader), - encoded_body_size=self.read_uint(reader), - decoded_body_size=self.read_uint(reader), - url=self.read_string(reader), - initiator=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader), + ttfb=MessageCodec.read_uint(reader), + header_size=MessageCodec.read_uint(reader), + encoded_body_size=MessageCodec.read_uint(reader), + decoded_body_size=MessageCodec.read_uint(reader), + url=MessageCodec.read_string(reader), + initiator=MessageCodec.read_string(reader) ) if message_id == 54: return ConnectionInformation( - downlink=self.read_uint(reader), - type=self.read_string(reader) + downlink=MessageCodec.read_uint(reader), + type=MessageCodec.read_string(reader) ) if message_id == 55: return SetPageVisibility( - hidden=self.read_boolean(reader) + hidden=MessageCodec.read_boolean(reader) ) if message_id == 56: return PerformanceTrackAggr( - timestamp_start=self.read_uint(reader), - timestamp_end=self.read_uint(reader), - min_fps=self.read_uint(reader), - avg_fps=self.read_uint(reader), - max_fps=self.read_uint(reader), - min_cpu=self.read_uint(reader), - avg_cpu=self.read_uint(reader), - max_cpu=self.read_uint(reader), - min_total_js_heap_size=self.read_uint(reader), - avg_total_js_heap_size=self.read_uint(reader), - max_total_js_heap_size=self.read_uint(reader), - min_used_js_heap_size=self.read_uint(reader), - avg_used_js_heap_size=self.read_uint(reader), - max_used_js_heap_size=self.read_uint(reader) + timestamp_start=MessageCodec.read_uint(reader), + timestamp_end=MessageCodec.read_uint(reader), + min_fps=MessageCodec.read_uint(reader), + avg_fps=MessageCodec.read_uint(reader), + max_fps=MessageCodec.read_uint(reader), + min_cpu=MessageCodec.read_uint(reader), + avg_cpu=MessageCodec.read_uint(reader), + max_cpu=MessageCodec.read_uint(reader), + min_total_js_heap_size=MessageCodec.read_uint(reader), + avg_total_js_heap_size=MessageCodec.read_uint(reader), + max_total_js_heap_size=MessageCodec.read_uint(reader), + min_used_js_heap_size=MessageCodec.read_uint(reader), + avg_used_js_heap_size=MessageCodec.read_uint(reader), + max_used_js_heap_size=MessageCodec.read_uint(reader) ) if message_id == 57: return LoadFontFace( - parent_id=self.read_uint(reader), - family=self.read_string(reader), - source=self.read_string(reader), - descriptors=self.read_string(reader) + parent_id=MessageCodec.read_uint(reader), + family=MessageCodec.read_string(reader), + source=MessageCodec.read_string(reader), + descriptors=MessageCodec.read_string(reader) ) if message_id == 58: return SetNodeFocus( - id=self.read_int(reader) + id=MessageCodec.read_int(reader) ) if message_id == 59: return LongTask( - timestamp=self.read_uint(reader), - duration=self.read_uint(reader), - context=self.read_uint(reader), - container_type=self.read_uint(reader), - container_src=self.read_string(reader), - container_id=self.read_string(reader), - container_name=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader), + context=MessageCodec.read_uint(reader), + container_type=MessageCodec.read_uint(reader), + container_src=MessageCodec.read_string(reader), + container_id=MessageCodec.read_string(reader), + container_name=MessageCodec.read_string(reader) ) if message_id == 60: return SetNodeAttributeURLBased( - id=self.read_uint(reader), - name=self.read_string(reader), - value=self.read_string(reader), - base_url=self.read_string(reader) + id=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader), + base_url=MessageCodec.read_string(reader) ) if message_id == 61: return SetCSSDataURLBased( - id=self.read_uint(reader), - data=self.read_string(reader), - base_url=self.read_string(reader) + id=MessageCodec.read_uint(reader), + data=MessageCodec.read_string(reader), + base_url=MessageCodec.read_string(reader) ) if message_id == 62: return IssueEventDeprecated( - message_id=self.read_uint(reader), - timestamp=self.read_uint(reader), - type=self.read_string(reader), - context_string=self.read_string(reader), - context=self.read_string(reader), - payload=self.read_string(reader) + message_id=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + type=MessageCodec.read_string(reader), + context_string=MessageCodec.read_string(reader), + context=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) ) if message_id == 63: return TechnicalInfo( - type=self.read_string(reader), - value=self.read_string(reader) + type=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) ) if message_id == 64: return CustomIssue( - name=self.read_string(reader), - payload=self.read_string(reader) + name=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) ) if message_id == 66: return AssetCache( - url=self.read_string(reader) + url=MessageCodec.read_string(reader) ) if message_id == 67: return CSSInsertRuleURLBased( - id=self.read_uint(reader), - rule=self.read_string(reader), - index=self.read_uint(reader), - base_url=self.read_string(reader) + id=MessageCodec.read_uint(reader), + rule=MessageCodec.read_string(reader), + index=MessageCodec.read_uint(reader), + base_url=MessageCodec.read_string(reader) ) if message_id == 69: return MouseClick( - id=self.read_uint(reader), - hesitation_time=self.read_uint(reader), - label=self.read_string(reader), - selector=self.read_string(reader) + id=MessageCodec.read_uint(reader), + hesitation_time=MessageCodec.read_uint(reader), + label=MessageCodec.read_string(reader), + selector=MessageCodec.read_string(reader) ) if message_id == 70: return CreateIFrameDocument( - frame_id=self.read_uint(reader), - id=self.read_uint(reader) + frame_id=MessageCodec.read_uint(reader), + id=MessageCodec.read_uint(reader) ) if message_id == 71: return AdoptedSSReplaceURLBased( - sheet_id=self.read_uint(reader), - text=self.read_string(reader), - base_url=self.read_string(reader) + sheet_id=MessageCodec.read_uint(reader), + text=MessageCodec.read_string(reader), + base_url=MessageCodec.read_string(reader) ) if message_id == 72: return AdoptedSSReplace( - sheet_id=self.read_uint(reader), - text=self.read_string(reader) + sheet_id=MessageCodec.read_uint(reader), + text=MessageCodec.read_string(reader) ) if message_id == 73: return AdoptedSSInsertRuleURLBased( - sheet_id=self.read_uint(reader), - rule=self.read_string(reader), - index=self.read_uint(reader), - base_url=self.read_string(reader) + sheet_id=MessageCodec.read_uint(reader), + rule=MessageCodec.read_string(reader), + index=MessageCodec.read_uint(reader), + base_url=MessageCodec.read_string(reader) ) if message_id == 74: return AdoptedSSInsertRule( - sheet_id=self.read_uint(reader), - rule=self.read_string(reader), - index=self.read_uint(reader) + sheet_id=MessageCodec.read_uint(reader), + rule=MessageCodec.read_string(reader), + index=MessageCodec.read_uint(reader) ) if message_id == 75: return AdoptedSSDeleteRule( - sheet_id=self.read_uint(reader), - index=self.read_uint(reader) + sheet_id=MessageCodec.read_uint(reader), + index=MessageCodec.read_uint(reader) ) if message_id == 76: return AdoptedSSAddOwner( - sheet_id=self.read_uint(reader), - id=self.read_uint(reader) + sheet_id=MessageCodec.read_uint(reader), + id=MessageCodec.read_uint(reader) ) if message_id == 77: return AdoptedSSRemoveOwner( - sheet_id=self.read_uint(reader), - id=self.read_uint(reader) + sheet_id=MessageCodec.read_uint(reader), + id=MessageCodec.read_uint(reader) ) if message_id == 78: return JSException( - name=self.read_string(reader), - message=self.read_string(reader), - payload=self.read_string(reader), - metadata=self.read_string(reader) + name=MessageCodec.read_string(reader), + message=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader), + metadata=MessageCodec.read_string(reader) ) if message_id == 79: return Zustand( - mutation=self.read_string(reader), - state=self.read_string(reader) + mutation=MessageCodec.read_string(reader), + state=MessageCodec.read_string(reader) ) if message_id == 80: return BatchMeta( - page_no=self.read_uint(reader), - first_index=self.read_uint(reader), - timestamp=self.read_int(reader) + page_no=MessageCodec.read_uint(reader), + first_index=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_int(reader) ) if message_id == 81: return BatchMetadata( - version=self.read_uint(reader), - page_no=self.read_uint(reader), - first_index=self.read_uint(reader), - timestamp=self.read_int(reader), - location=self.read_string(reader) + version=MessageCodec.read_uint(reader), + page_no=MessageCodec.read_uint(reader), + first_index=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_int(reader), + location=MessageCodec.read_string(reader) ) if message_id == 82: return PartitionedMessage( - part_no=self.read_uint(reader), - part_total=self.read_uint(reader) + part_no=MessageCodec.read_uint(reader), + part_total=MessageCodec.read_uint(reader) ) if message_id == 83: return NetworkRequest( - type=self.read_string(reader), - method=self.read_string(reader), - url=self.read_string(reader), - request=self.read_string(reader), - response=self.read_string(reader), - status=self.read_uint(reader), - timestamp=self.read_uint(reader), - duration=self.read_uint(reader), - transferred_body_size=self.read_uint(reader) + type=MessageCodec.read_string(reader), + method=MessageCodec.read_string(reader), + url=MessageCodec.read_string(reader), + request=MessageCodec.read_string(reader), + response=MessageCodec.read_string(reader), + status=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader), + transferred_body_size=MessageCodec.read_uint(reader) ) if message_id == 84: @@ -770,256 +770,256 @@ cdef class MessageCodec: if message_id == 112: return InputChange( - id=self.read_uint(reader), - value=self.read_string(reader), - value_masked=self.read_boolean(reader), - label=self.read_string(reader), - hesitation_time=self.read_int(reader), - input_duration=self.read_int(reader) + id=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader), + value_masked=MessageCodec.read_boolean(reader), + label=MessageCodec.read_string(reader), + hesitation_time=MessageCodec.read_int(reader), + input_duration=MessageCodec.read_int(reader) ) if message_id == 113: return SelectionChange( - selection_start=self.read_uint(reader), - selection_end=self.read_uint(reader), - selection=self.read_string(reader) + selection_start=MessageCodec.read_uint(reader), + selection_end=MessageCodec.read_uint(reader), + selection=MessageCodec.read_string(reader) ) if message_id == 114: return MouseThrashing( - timestamp=self.read_uint(reader) + timestamp=MessageCodec.read_uint(reader) ) if message_id == 115: return UnbindNodes( - total_removed_percent=self.read_uint(reader) + total_removed_percent=MessageCodec.read_uint(reader) ) if message_id == 116: return ResourceTiming( - timestamp=self.read_uint(reader), - duration=self.read_uint(reader), - ttfb=self.read_uint(reader), - header_size=self.read_uint(reader), - encoded_body_size=self.read_uint(reader), - decoded_body_size=self.read_uint(reader), - url=self.read_string(reader), - initiator=self.read_string(reader), - transferred_size=self.read_uint(reader), - cached=self.read_boolean(reader) + timestamp=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader), + ttfb=MessageCodec.read_uint(reader), + header_size=MessageCodec.read_uint(reader), + encoded_body_size=MessageCodec.read_uint(reader), + decoded_body_size=MessageCodec.read_uint(reader), + url=MessageCodec.read_string(reader), + initiator=MessageCodec.read_string(reader), + transferred_size=MessageCodec.read_uint(reader), + cached=MessageCodec.read_boolean(reader) ) if message_id == 117: return TabChange( - tab_id=self.read_string(reader) + tab_id=MessageCodec.read_string(reader) ) if message_id == 118: return TabData( - tab_id=self.read_string(reader) + tab_id=MessageCodec.read_string(reader) ) if message_id == 119: return CanvasNode( - node_id=self.read_string(reader), - timestamp=self.read_uint(reader) + node_id=MessageCodec.read_string(reader), + timestamp=MessageCodec.read_uint(reader) ) if message_id == 125: return IssueEvent( - message_id=self.read_uint(reader), - timestamp=self.read_uint(reader), - type=self.read_string(reader), - context_string=self.read_string(reader), - context=self.read_string(reader), - payload=self.read_string(reader), - url=self.read_string(reader) + message_id=MessageCodec.read_uint(reader), + timestamp=MessageCodec.read_uint(reader), + type=MessageCodec.read_string(reader), + context_string=MessageCodec.read_string(reader), + context=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader), + url=MessageCodec.read_string(reader) ) if message_id == 126: return SessionEnd( - timestamp=self.read_uint(reader), - encryption_key=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + encryption_key=MessageCodec.read_string(reader) ) if message_id == 127: return SessionSearch( - timestamp=self.read_uint(reader), - partition=self.read_uint(reader) + timestamp=MessageCodec.read_uint(reader), + partition=MessageCodec.read_uint(reader) ) if message_id == 90: return IOSSessionStart( - timestamp=self.read_uint(reader), - project_id=self.read_uint(reader), - tracker_version=self.read_string(reader), - rev_id=self.read_string(reader), - user_uuid=self.read_string(reader), - user_os=self.read_string(reader), - user_os_version=self.read_string(reader), - user_device=self.read_string(reader), - user_device_type=self.read_string(reader), - user_country=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + project_id=MessageCodec.read_uint(reader), + tracker_version=MessageCodec.read_string(reader), + rev_id=MessageCodec.read_string(reader), + user_uuid=MessageCodec.read_string(reader), + user_os=MessageCodec.read_string(reader), + user_os_version=MessageCodec.read_string(reader), + user_device=MessageCodec.read_string(reader), + user_device_type=MessageCodec.read_string(reader), + user_country=MessageCodec.read_string(reader) ) if message_id == 91: return IOSSessionEnd( - timestamp=self.read_uint(reader) + timestamp=MessageCodec.read_uint(reader) ) if message_id == 92: return IOSMetadata( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - key=self.read_string(reader), - value=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + key=MessageCodec.read_string(reader), + value=MessageCodec.read_string(reader) ) if message_id == 93: return IOSEvent( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - name=self.read_string(reader), - payload=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) ) if message_id == 94: return IOSUserID( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - id=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + id=MessageCodec.read_string(reader) ) if message_id == 95: return IOSUserAnonymousID( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - id=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + id=MessageCodec.read_string(reader) ) if message_id == 96: return IOSScreenChanges( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - x=self.read_uint(reader), - y=self.read_uint(reader), - width=self.read_uint(reader), - height=self.read_uint(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + x=MessageCodec.read_uint(reader), + y=MessageCodec.read_uint(reader), + width=MessageCodec.read_uint(reader), + height=MessageCodec.read_uint(reader) ) if message_id == 97: return IOSCrash( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - name=self.read_string(reader), - reason=self.read_string(reader), - stacktrace=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader), + reason=MessageCodec.read_string(reader), + stacktrace=MessageCodec.read_string(reader) ) if message_id == 98: return IOSViewComponentEvent( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - screen_name=self.read_string(reader), - view_name=self.read_string(reader), - visible=self.read_boolean(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + screen_name=MessageCodec.read_string(reader), + view_name=MessageCodec.read_string(reader), + visible=MessageCodec.read_boolean(reader) ) if message_id == 100: return IOSClickEvent( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - label=self.read_string(reader), - x=self.read_uint(reader), - y=self.read_uint(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + label=MessageCodec.read_string(reader), + x=MessageCodec.read_uint(reader), + y=MessageCodec.read_uint(reader) ) if message_id == 101: return IOSInputEvent( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - value=self.read_string(reader), - value_masked=self.read_boolean(reader), - label=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + value=MessageCodec.read_string(reader), + value_masked=MessageCodec.read_boolean(reader), + label=MessageCodec.read_string(reader) ) if message_id == 102: return IOSPerformanceEvent( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - name=self.read_string(reader), - value=self.read_uint(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + name=MessageCodec.read_string(reader), + value=MessageCodec.read_uint(reader) ) if message_id == 103: return IOSLog( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - severity=self.read_string(reader), - content=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + severity=MessageCodec.read_string(reader), + content=MessageCodec.read_string(reader) ) if message_id == 104: return IOSInternalError( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - content=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + content=MessageCodec.read_string(reader) ) if message_id == 105: return IOSNetworkCall( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - type=self.read_string(reader), - method=self.read_string(reader), - url=self.read_string(reader), - request=self.read_string(reader), - response=self.read_string(reader), - status=self.read_uint(reader), - duration=self.read_uint(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + type=MessageCodec.read_string(reader), + method=MessageCodec.read_string(reader), + url=MessageCodec.read_string(reader), + request=MessageCodec.read_string(reader), + response=MessageCodec.read_string(reader), + status=MessageCodec.read_uint(reader), + duration=MessageCodec.read_uint(reader) ) if message_id == 106: return IOSSwipeEvent( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - label=self.read_string(reader), - x=self.read_uint(reader), - y=self.read_uint(reader), - direction=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + label=MessageCodec.read_string(reader), + x=MessageCodec.read_uint(reader), + y=MessageCodec.read_uint(reader), + direction=MessageCodec.read_string(reader) ) if message_id == 107: return IOSBatchMeta( - timestamp=self.read_uint(reader), - length=self.read_uint(reader), - first_index=self.read_uint(reader) + timestamp=MessageCodec.read_uint(reader), + length=MessageCodec.read_uint(reader), + first_index=MessageCodec.read_uint(reader) ) if message_id == 110: return IOSPerformanceAggregated( - timestamp_start=self.read_uint(reader), - timestamp_end=self.read_uint(reader), - min_fps=self.read_uint(reader), - avg_fps=self.read_uint(reader), - max_fps=self.read_uint(reader), - min_cpu=self.read_uint(reader), - avg_cpu=self.read_uint(reader), - max_cpu=self.read_uint(reader), - min_memory=self.read_uint(reader), - avg_memory=self.read_uint(reader), - max_memory=self.read_uint(reader), - min_battery=self.read_uint(reader), - avg_battery=self.read_uint(reader), - max_battery=self.read_uint(reader) + timestamp_start=MessageCodec.read_uint(reader), + timestamp_end=MessageCodec.read_uint(reader), + min_fps=MessageCodec.read_uint(reader), + avg_fps=MessageCodec.read_uint(reader), + max_fps=MessageCodec.read_uint(reader), + min_cpu=MessageCodec.read_uint(reader), + avg_cpu=MessageCodec.read_uint(reader), + max_cpu=MessageCodec.read_uint(reader), + min_memory=MessageCodec.read_uint(reader), + avg_memory=MessageCodec.read_uint(reader), + max_memory=MessageCodec.read_uint(reader), + min_battery=MessageCodec.read_uint(reader), + avg_battery=MessageCodec.read_uint(reader), + max_battery=MessageCodec.read_uint(reader) ) if message_id == 111: return IOSIssueEvent( - timestamp=self.read_uint(reader), - type=self.read_string(reader), - context_string=self.read_string(reader), - context=self.read_string(reader), - payload=self.read_string(reader) + timestamp=MessageCodec.read_uint(reader), + type=MessageCodec.read_string(reader), + context_string=MessageCodec.read_string(reader), + context=MessageCodec.read_string(reader), + payload=MessageCodec.read_string(reader) ) diff --git a/ee/connectors/utils/pg_client.py b/ee/connectors/utils/pg_client.py index 1bfad6d36..55c5d2172 100644 --- a/ee/connectors/utils/pg_client.py +++ b/ee/connectors/utils/pg_client.py @@ -1,23 +1,30 @@ import logging import time -from threading import Semaphore - -import psycopg2 -import psycopg2.extras +from sqlalchemy import create_engine +from sqlalchemy import MetaData +from sqlalchemy.orm import sessionmaker, session +from contextlib import contextmanager +import logging +from decouple import config as _config +from decouple import Choices +from contextlib import contextmanager from decouple import config -from psycopg2 import pool logging.basicConfig(level=config("LOGLEVEL", default=logging.INFO)) logging.getLogger('apscheduler').setLevel(config("LOGLEVEL", default=logging.INFO)) +sslmode = _config('DB_SSLMODE', + cast=Choices(['disable', 'allow', 'prefer', 'require', 'verify-ca', 'verify-full']), + default='allow' +) + conn_str = config('string_connection', default='') if conn_str == '': - _PG_CONFIG = {"host": config("pg_host"), - "database": config("pg_dbname"), - "user": config("pg_user"), - "password": config("pg_password"), - "port": config("pg_port", cast=int), - "application_name": config("APP_NAME", default="PY")} + pg_host = config("pg_host") + pg_dbname = config("pg_dbname") + pg_user = config("pg_user") + pg_password = config("pg_password") + pg_port = config("pg_port", cast=int) else: import urllib.parse conn_str = urllib.parse.unquote(conn_str) @@ -32,174 +39,28 @@ else: else: pg_host, pg_port = host_info.split(':') pg_port = int(pg_port) - _PG_CONFIG = {"host": pg_host, - "database": pg_dbname, - "user": pg_user, - "password": pg_password, - "port": pg_port, - "application_name": config("APP_NAME", default="PY")} - -PG_CONFIG = dict(_PG_CONFIG) -if config("PG_TIMEOUT", cast=int, default=0) > 0: - PG_CONFIG["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int) * 1000}" - - -class ORThreadedConnectionPool(psycopg2.pool.ThreadedConnectionPool): - def __init__(self, minconn, maxconn, *args, **kwargs): - self._semaphore = Semaphore(maxconn) - super().__init__(minconn, maxconn, *args, **kwargs) - - def getconn(self, *args, **kwargs): - self._semaphore.acquire() - try: - return super().getconn(*args, **kwargs) - except psycopg2.pool.PoolError as e: - if str(e) == "connection pool is closed": - make_pool() - raise e - - def putconn(self, *args, **kwargs): - try: - super().putconn(*args, **kwargs) - self._semaphore.release() - except psycopg2.pool.PoolError as e: - if str(e) == "trying to put unkeyed connection": - print("!!! trying to put unkeyed connection") - print(f"env-PG_POOL:{config('PG_POOL', default=None)}") - return - raise e - - -postgreSQL_pool: ORThreadedConnectionPool = None - -RETRY_MAX = config("PG_RETRY_MAX", cast=int, default=50) -RETRY_INTERVAL = config("PG_RETRY_INTERVAL", cast=int, default=2) -RETRY = 0 - - -def make_pool(): - if not config('PG_POOL', cast=bool, default=True): - return - global postgreSQL_pool - global RETRY - if postgreSQL_pool is not None: - try: - postgreSQL_pool.closeall() - except (Exception, psycopg2.DatabaseError) as error: - logging.error("Error while closing all connexions to PostgreSQL", error) - try: - postgreSQL_pool = ORThreadedConnectionPool(config("PG_MINCONN", cast=int, default=20), - config("PG_MAXCONN", cast=int, default=80), - **PG_CONFIG) - if (postgreSQL_pool): - logging.info("Connection pool created successfully") - except (Exception, psycopg2.DatabaseError) as error: - logging.error("Error while connecting to PostgreSQL", error) - if RETRY < RETRY_MAX: - RETRY += 1 - logging.info(f"waiting for {RETRY_INTERVAL}s before retry n°{RETRY}") - time.sleep(RETRY_INTERVAL) - make_pool() - else: - raise error +conn_str = f"postgresql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_dbname}" class PostgresClient: - connection = None - cursor = None - long_query = False - unlimited_query = False + CONNECTION_STRING: str = conn_str + _sessions = sessionmaker() - def __init__(self, long_query=False, unlimited_query=False, use_pool=True): - self.long_query = long_query - self.unlimited_query = unlimited_query - self.use_pool = use_pool - if unlimited_query: - long_config = dict(_PG_CONFIG) - long_config["application_name"] += "-UNLIMITED" - self.connection = psycopg2.connect(**long_config) - elif long_query: - long_config = dict(_PG_CONFIG) - long_config["application_name"] += "-LONG" - long_config["options"] = f"-c statement_timeout=" \ - f"{config('pg_long_timeout', cast=int, default=5 * 60) * 1000}" - self.connection = psycopg2.connect(**long_config) - elif not use_pool or not config('PG_POOL', cast=bool, default=True): - single_config = dict(_PG_CONFIG) - single_config["application_name"] += "-NOPOOL" - single_config["options"] = f"-c statement_timeout={config('PG_TIMEOUT', cast=int, default=30) * 1000}" - self.connection = psycopg2.connect(**single_config) - else: - self.connection = postgreSQL_pool.getconn() + def __init__(self): + self.engine = create_engine(self.CONNECTION_STRING, connect_args={'sslmode': sslmode}) - def __enter__(self): - if self.cursor is None: - self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) - self.cursor.cursor_execute = self.cursor.execute - self.cursor.execute = self.__execute - self.cursor.recreate = self.recreate_cursor - return self.cursor + @contextmanager + def get_live_session(self) -> session: + """ + This is a session that can be committed. + Changes will be reflected in the database. + """ + # Automatic transaction and connection handling in session + connection = self.engine.connect() + my_session = type(self)._sessions(bind=connection) - def __exit__(self, *args): - try: - self.connection.commit() - self.cursor.close() - if not self.use_pool or self.long_query or self.unlimited_query: - self.connection.close() - except Exception as error: - logging.error("Error while committing/closing PG-connection", error) - if str(error) == "connection already closed" \ - and self.use_pool \ - and not self.long_query \ - and not self.unlimited_query \ - and config('PG_POOL', cast=bool, default=True): - logging.info("Recreating the connexion pool") - make_pool() - else: - raise error - finally: - if config('PG_POOL', cast=bool, default=True) \ - and self.use_pool \ - and not self.long_query \ - and not self.unlimited_query: - postgreSQL_pool.putconn(self.connection) + yield my_session - def __execute(self, query, vars=None): - try: - result = self.cursor.cursor_execute(query=query, vars=vars) - except psycopg2.Error as error: - logging.error(f"!!! Error of type:{type(error)} while executing query:") - logging.error(query) - logging.info("starting rollback to allow future execution") - self.connection.rollback() - raise error - return result + my_session.close() + connection.close() - def recreate_cursor(self, rollback=False): - if rollback: - try: - self.connection.rollback() - except Exception as error: - logging.error("Error while rollbacking connection for recreation", error) - try: - self.cursor.close() - except Exception as error: - logging.error("Error while closing cursor for recreation", error) - self.cursor = None - return self.__enter__() - - -async def init(): - logging.info(f">PG_POOL:{config('PG_POOL', default=None)}") - if config('PG_POOL', cast=bool, default=True): - make_pool() - - -async def terminate(): - global postgreSQL_pool - if postgreSQL_pool is not None: - try: - postgreSQL_pool.closeall() - logging.info("Closed all connexions to PostgreSQL") - except (Exception, psycopg2.DatabaseError) as error: - logging.error("Error while closing all connexions to PostgreSQL", error) diff --git a/ee/connectors/utils/worker.py b/ee/connectors/utils/worker.py index a79e3dd3a..38780f892 100644 --- a/ee/connectors/utils/worker.py +++ b/ee/connectors/utils/worker.py @@ -151,7 +151,7 @@ class ProjectFilter: def read_from_kafka(pipe: Connection, params: dict): global UPLOAD_RATE, max_kafka_read # try: - asyncio.run(pg_client.init()) + # asyncio.run(pg_client.init()) kafka_consumer = init_consumer() project_filter = params['project_filter'] capture_messages = list() @@ -207,7 +207,7 @@ def read_from_kafka(pipe: Connection, params: dict): print('[WORKER INFO] Closing consumer') close_consumer(kafka_consumer) print('[WORKER INFO] Closing pg connection') - asyncio.run(pg_client.terminate()) + # asyncio.run(pg_client.terminate()) print('[WORKER INFO] Successfully closed reader task') # except Exception as e: # print('[WARN]', repr(e)) @@ -223,12 +223,12 @@ def into_batch(batch: list[Event | DetailedEvent], session_id: int, n: Session): def project_from_session(sessionId: int): """Search projectId of requested sessionId in PG table sessions""" - with pg_client.PostgresClient() as conn: - conn.execute( + with pg_client.PostgresClient().get_live_session() as conn: + cur = conn.execute( conn.mogrify("SELECT project_id FROM sessions WHERE session_id=%(sessionId)s LIMIT 1", {'sessionId': sessionId}) ) - res = conn.fetchone() + res = cur.fetchone() if res is None: print(f'[WORKER WARN] sessionid {sessionId} not found in sessions table') return None @@ -241,13 +241,13 @@ def project_from_sessions(sessionIds: list[int]): while sessionIds: sessIds = sessionIds[-1000:] try: - with pg_client.PostgresClient() as conn: - conn.execute( + with pg_client.PostgresClient().get_live_session() as conn: + cur = conn.execute( "SELECT session_id, project_id FROM sessions WHERE session_id IN ({sessionIds})".format( sessionIds=','.join([str(sessId) for sessId in sessIds]) ) ) - res = conn.fetchall() + res = cur.fetchall() except Exception as e: print('[WORKER project_from_sessions]', repr(e)) raise e @@ -320,16 +320,16 @@ def fix_missing_redshift(): return # logging.info(f'[FILL INFO] {len(res)} length response') sessionids = list(map(lambda k: str(k), res['sessionid'])) - asyncio.run(pg_client.init()) + # asyncio.run(pg_client.init()) try: - with pg_client.PostgresClient() as conn: - conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format( + with pg_client.PostgresClient().get_live_session() as conn: + cur = conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format( session_id_list=','.join(sessionids)) ) - pg_res = conn.fetchall() + pg_res = cur.fetchall() except Exception as e: #logging.error(f'[ERROR] Error while selecting from pg: {repr(e)}') - asyncio.run(pg_client.terminate()) + # asyncio.run(pg_client.terminate()) return logging.info(f'response from pg, length {len(pg_res)}') df = pd.DataFrame(pg_res) @@ -350,7 +350,7 @@ def fix_missing_redshift(): if len(all_ids) == 0: logging.info('[FILL INFO] No ids obtained') database_api.close() - asyncio.run(pg_client.terminate()) + # asyncio.run(pg_client.terminate()) return # logging.info(f'[FILL INFO] {base_query}') try: @@ -359,11 +359,11 @@ def fix_missing_redshift(): logging.error(f'[ERROR] Error while executing query. {repr(e)}') logging.error(f'[ERROR INFO] query: {base_query}') database_api.close() - asyncio.run(pg_client.terminate()) + # asyncio.run(pg_client.terminate()) return logging.info(f'[FILL-INFO] {time() - t} - for {len(sessionids)} elements') database_api.close() - asyncio.run(pg_client.terminate()) + # asyncio.run(pg_client.terminate()) return @@ -488,6 +488,12 @@ class WorkerPool: except TimeoutError as e: print('[WORKER-TimeoutError] Decoding of messages is taking longer than expected') raise e + except Exception as e: + print(f'[Exception] {e}') + self.sessions_update_batch = dict() + self.sessions_insert_batch = dict() + self.events_batch = list() + continue session_ids, messages = self._pool_response_handler( pool_results=results) if current_loop_number == 0: @@ -500,7 +506,7 @@ class WorkerPool: main_conn.send('CONTINUE') print('[WORKER-INFO] Sending close signal') main_conn.send('CLOSE') - self.terminate() + self.terminate(database_api) kafka_reader_process.terminate() print('[WORKER-SHUTDOWN] Process terminated')