feat(chalice): Kafka health check
This commit is contained in:
parent
e417e479c7
commit
f2172607ed
3 changed files with 34 additions and 34 deletions
|
|
@ -56,7 +56,7 @@ def __get_live_sessions_ws(project_id, data):
|
|||
results = requests.post(ASSIST_URL + config("assist") + f"/{project_key}",
|
||||
json=data, timeout=config("assistTimeout", cast=int, default=5))
|
||||
if results.status_code != 200:
|
||||
print(f"!! issue with the peer-server code:{results.status_code}")
|
||||
print(f"!! issue with the peer-server code:{results.status_code} for __get_live_sessions_ws")
|
||||
print(results.text)
|
||||
return {"total": 0, "sessions": []}
|
||||
live_peers = results.json().get("data", [])
|
||||
|
|
@ -106,7 +106,7 @@ def get_live_session_by_id(project_id, session_id):
|
|||
results = requests.get(ASSIST_URL + config("assist") + f"/{project_key}/{session_id}",
|
||||
timeout=config("assistTimeout", cast=int, default=5))
|
||||
if results.status_code != 200:
|
||||
print(f"!! issue with the peer-server code:{results.status_code}")
|
||||
print(f"!! issue with the peer-server code:{results.status_code} for get_live_session_by_id")
|
||||
print(results.text)
|
||||
return None
|
||||
results = results.json().get("data")
|
||||
|
|
@ -136,7 +136,7 @@ def is_live(project_id, session_id, project_key=None):
|
|||
results = requests.get(ASSIST_URL + config("assistList") + f"/{project_key}/{session_id}",
|
||||
timeout=config("assistTimeout", cast=int, default=5))
|
||||
if results.status_code != 200:
|
||||
print(f"!! issue with the peer-server code:{results.status_code}")
|
||||
print(f"!! issue with the peer-server code:{results.status_code} for is_live")
|
||||
print(results.text)
|
||||
return False
|
||||
results = results.json().get("data")
|
||||
|
|
@ -165,7 +165,7 @@ def autocomplete(project_id, q: str, key: str = None):
|
|||
ASSIST_URL + config("assistList") + f"/{project_key}/autocomplete",
|
||||
params=params, timeout=config("assistTimeout", cast=int, default=5))
|
||||
if results.status_code != 200:
|
||||
print(f"!! issue with the peer-server code:{results.status_code}")
|
||||
print(f"!! issue with the peer-server code:{results.status_code} for autocomplete")
|
||||
print(results.text)
|
||||
return {"errors": [f"Something went wrong wile calling assist:{results.text}"]}
|
||||
results = results.json().get("data", [])
|
||||
|
|
@ -248,7 +248,7 @@ def session_exists(project_id, session_id):
|
|||
results = requests.get(ASSIST_URL + config("assist") + f"/{project_key}/{session_id}",
|
||||
timeout=config("assistTimeout", cast=int, default=5))
|
||||
if results.status_code != 200:
|
||||
print(f"!! issue with the peer-server code:{results.status_code}")
|
||||
print(f"!! issue with the peer-server code:{results.status_code} for session_exists")
|
||||
print(results.text)
|
||||
return None
|
||||
results = results.json().get("data")
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ from urllib.parse import urlparse
|
|||
|
||||
import redis
|
||||
import requests
|
||||
# from confluent_kafka.admin import AdminClient
|
||||
from confluent_kafka.admin import AdminClient
|
||||
from decouple import config
|
||||
|
||||
from chalicelib.utils import pg_client, ch_client
|
||||
|
|
@ -149,8 +149,7 @@ def get_health():
|
|||
"ingestionPipeline": {
|
||||
**({"redis": __check_redis} if config("REDIS_STRING", default=None)
|
||||
and len(config("REDIS_STRING")) > 0 else {}),
|
||||
# "kafka": __check_kafka
|
||||
"kafka": __always_healthy
|
||||
"kafka": __check_kafka
|
||||
},
|
||||
"backendServices": {
|
||||
"alerts": __check_be_service("alerts"),
|
||||
|
|
@ -210,28 +209,29 @@ def __check_database_ch():
|
|||
}
|
||||
}
|
||||
|
||||
# def __check_kafka():
|
||||
# fail_response = {
|
||||
# "health": False,
|
||||
# "details": {"errors": ["server health-check failed"]}
|
||||
# }
|
||||
# if config("KAFKA_SERVERS", default=None) is None:
|
||||
# fail_response["details"]["errors"].append("KAFKA_SERVERS not defined in env-vars")
|
||||
# return fail_response
|
||||
#
|
||||
# try:
|
||||
# a = AdminClient({'bootstrap.servers': config("KAFKA_SERVERS"), "socket.connection.setup.timeout.ms": 3000})
|
||||
# topics = a.list_topics().topics
|
||||
# if not topics:
|
||||
# raise Exception('topics not found')
|
||||
#
|
||||
# except Exception as e:
|
||||
# print("!! Issue getting kafka-health response")
|
||||
# print(str(e))
|
||||
# fail_response["details"]["errors"].append(str(e))
|
||||
# return fail_response
|
||||
#
|
||||
# return {
|
||||
# "health": True,
|
||||
# "details": {}
|
||||
# }
|
||||
|
||||
def __check_kafka():
|
||||
fail_response = {
|
||||
"health": False,
|
||||
"details": {"errors": ["server health-check failed"]}
|
||||
}
|
||||
if config("KAFKA_SERVERS", default=None) is None:
|
||||
fail_response["details"]["errors"].append("KAFKA_SERVERS not defined in env-vars")
|
||||
return fail_response
|
||||
|
||||
try:
|
||||
a = AdminClient({'bootstrap.servers': config("KAFKA_SERVERS"), "socket.connection.setup.timeout.ms": 3000})
|
||||
topics = a.list_topics().topics
|
||||
if not topics:
|
||||
raise Exception('topics not found')
|
||||
|
||||
except Exception as e:
|
||||
print("!! Issue getting kafka-health response")
|
||||
print(str(e))
|
||||
fail_response["details"]["errors"].append(str(e))
|
||||
return fail_response
|
||||
|
||||
return {
|
||||
"health": True,
|
||||
"details": {}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,4 +19,4 @@ python3-saml==1.15.0
|
|||
python-multipart==0.0.6
|
||||
|
||||
redis==4.5.3
|
||||
#confluent-kafka==2.0.2
|
||||
confluent-kafka==2.1.0
|
||||
Loading…
Add table
Reference in a new issue