openreplay/ee/connectors/fill_from_db.py
MauricioGarciaS 91921d622e
fix(connectors): Created module in Cython to speed up message decoding (#1338)
* Updated pg connector

* fix(player): fix first 8 byte checker

* fix(player): fix commit conflict

* Added pylint

* Removed pylint for incompatible license

* change(ui): check for sessions records state

* Patch/api v1.12.0 (#1299)

* fix(chalice): include metadata in sessions exp search

* fix(chalice): fixed sessions exp search wrong col name

* fix(chalice): removed cookies

* fix(chalice): changed base image to support SSO/xmlsec

* fix(chalice): changed Dockerfile to support SSO/xmlsec

* fix(chalice): changed Dockerfile to support SSO/xmlsec

(cherry picked from commit 4b8cf9742c)

* fix(ui): project fallback to recorded variable

* Patch/api v1.12.0 (#1301)

* fix(chalice): changed base image to support SSO/xmlsec

* fix(chalice): fixed exp search null metadata

(cherry picked from commit ab000751d2)

* change(ui): assist no content message styles and icons

* change(ui): revert menu disable

* fix(connector): Added method to save state in s3 for redshift if sigterm arise

* Rewriting python code in cython

* Added pyx module for messages

* Auto create pyx files

* Updated and fixed msgcodec.pyx

* Added new module to connector code

* Updated kafka lib for base image

* cleaned Docker and updated base image version for pandas

* cleaned prints

* Added code to fetch data from db and add it into redshift

* Updated consumer reading method. Async multithreading over sessionId

* Added split for country (Country,State,City)

* Fixed decoding issue for uint

* Created service able to fix data from redshift by reading from db

* Handle when process ended because of lost connection to pg, country set to country value only
2023-06-23 14:49:39 +02:00

60 lines
2.2 KiB
Python

import pandas_redshift as pdredshift
from utils import pg_client
from decouple import config, Choices
import asyncio
DATABASE = config('CLOUD_SERVICE')
sessions_table_name = config('SESSIONS_TABLE', default='connector_user_sessions')
table = sessions_table_name
sslmode = config('DB_SSLMODE',
cast=Choices(['disable', 'allow', 'prefer', 'require', 'verify-ca', 'verify-full']),
default='allow'
)
ci = config('cluster_info', default='')
cluster_info = dict()
if ci == '':
cluster_info['USER'] = config('USER')
cluster_info['HOST'] = config('HOST')
cluster_info['PORT'] = config('PORT')
cluster_info['PASSWORD'] = config('PASSWORD')
cluster_info['DBNAME'] = config('DBNAME')
else:
ci = ci.split(' ')
cluster_info = dict()
for _d in ci:
k,v = _d.split('=')
cluster_info[k]=v
pdredshift.connect_to_redshift(dbname=cluster_info['DBNAME'],
host=cluster_info['HOST'],
port=cluster_info['PORT'],
user=cluster_info['USER'],
password=cluster_info['PASSWORD'],
sslmode=sslmode)
async def main(limit = 100):
query = "SELECT sessionid FROM {table} WHERE user_id = 'NULL' LIMIT {limit}"
try:
res = pdredshift.redshift_to_pandas(query.format(table=table, limit=limit))
except Exception as e:
print(repr(e))
if len(res) == 0:
return
sessionids = list(map(lambda k: str(k), res['sessionid']))
await pg_client.init()
with pg_client.PostgresClient() as conn:
conn.execute('SELECT session_id, user_id FROM sessions WHERE session_id IN ({session_id_list})'.format(
session_id_list = ','.join(sessionids))
)
pg_res = conn.fetchall()
base_query = "UPDATE {table} SET user_id = '{user_id}' WHERE sessionid = {session_id}"
for e in pg_res:
query = base_query.format(table=table, user_id=e['user_id'], session_id=e['session_id'])
try:
pdredshift.exec_commit(query)
except Exception as e:
print(repr(e))
if __name__ == '__main__':
asyncio.run(main())