Skip to content

Commit

Permalink
thread_lock added to all PrometheusAPI calls (#8553)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Osypenko <dosypenk@redhat.com>
  • Loading branch information
DanielOsypenko authored Oct 19, 2023
1 parent 1eb8116 commit 79f00fc
Show file tree
Hide file tree
Showing 39 changed files with 260 additions and 123 deletions.
7 changes: 6 additions & 1 deletion ocs_ci/framework/pytest_customization/ocscilib.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,13 @@ def pytest_runtest_makereport(item, call):
):
metrics = item.get_closest_marker("gather_metrics_on_fail").args
try:
threading_lock = call.getfixturevalue("threading_lock")
collect_prometheus_metrics(
metrics, f"{item.name}-{call.when}", call.start, call.stop
metrics,
f"{item.name}-{call.when}",
call.start,
call.stop,
threading_lock=threading_lock,
)
except Exception:
log.exception("Failed to collect prometheus metrics")
Expand Down
7 changes: 5 additions & 2 deletions ocs_ci/ocs/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2387,7 +2387,9 @@ class for lvm cluster
"""

def __init__(self, fstrim=False, fail_on_thin_pool_not_empty=False):
def __init__(
self, fstrim=False, fail_on_thin_pool_not_empty=False, threading_lock=None
):
"""
Initiate the class, gets 2 parameters.
Args:
Expand All @@ -2405,6 +2407,7 @@ def __init__(self, fstrim=False, fail_on_thin_pool_not_empty=False):
self.vg_data = None
self.node_ssh = None
self.new_prom = None
self.threading_lock = threading_lock
func_list = [
self.cluster_ip(),
self.get_lvmcluster(),
Expand Down Expand Up @@ -2461,7 +2464,7 @@ def __init__(self, fstrim=False, fail_on_thin_pool_not_empty=False):
thread_init_class(func_list, shutdown=0)

def init_prom(self):
self.new_prom = PrometheusAPI()
self.new_prom = PrometheusAPI(threading_lock=self.threading_lock)

def get_lvmcluster(self):
"""
Expand Down
5 changes: 3 additions & 2 deletions ocs_ci/ocs/cluster_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(
sa_factory=None,
pod_factory=None,
target_percentage=None,
threading_lock=None,
):
"""
Initializer for ClusterLoad
Expand All @@ -63,9 +64,9 @@ def __init__(
pod_factory (function): A call to pod_factory function
target_percentage (float): The percentage of cluster load that is
required. The value should be greater than 0.1 and smaller than 0.95
threading_lock (threading.RLock): A threading.RLock object to be used for threading lock
"""
self.prometheus_api = PrometheusAPI()
self.prometheus_api = PrometheusAPI(threading_lock=threading_lock)
self.pvc_factory = pvc_factory
self.sa_factory = sa_factory
self.pod_factory = pod_factory
Expand Down
4 changes: 4 additions & 0 deletions ocs_ci/ocs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,5 +647,9 @@ class UnableUpgradeConnectionException(Exception):
pass


class NoThreadingLockUsedError(Exception):
pass


class VSLMNotFoundException(Exception):
pass
3 changes: 3 additions & 0 deletions ocs_ci/ocs/fiojob.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ def workload_fio_storageutilization(
keep_fio_data=False,
minimal_time=480,
throw_skip=True,
threading_lock=None,
):
"""
This function implements core functionality of fio storage utilization
Expand Down Expand Up @@ -392,6 +393,7 @@ def workload_fio_storageutilization(
(See more details in the function 'measure_operation')
throw_skip (bool): if True function will raise pytest.skip.Exception and test will be skipped,
otherwise return None
threading_lock (threading.RLock): lock to be used for thread synchronization when calling 'oc' cmd
Returns:
dict: measurement results with timestamps and other medatada from
Expand Down Expand Up @@ -537,6 +539,7 @@ def workload_fio_storageutilization(
test_file,
measure_after=True,
minimal_time=minimal_time,
threading_lock=threading_lock,
)

# we don't need to delete anything if this fixture has been already
Expand Down
35 changes: 23 additions & 12 deletions ocs_ci/ocs/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,19 @@ def get_list_pvc_objs_created_on_monitoring_pods():


@retry(ServiceUnavailable, tries=60, delay=3, backoff=1)
def get_metrics_persistentvolumeclaims_info():
def get_metrics_persistentvolumeclaims_info(threading_lock):
"""
Returns the created pvc information on prometheus pod
Args:
threading_lock (threading.RLock): A lock to prevent multiple threads calling 'oc' command at the same time
Returns:
response.content (dict): The pvc metrics collected on prometheus pod
"""

prometheus = ocs_ci.utility.prometheus.PrometheusAPI()
prometheus = ocs_ci.utility.prometheus.PrometheusAPI(threading_lock=threading_lock)
response = prometheus.get(
"query?query=kube_pod_spec_volumes_persistentvolumeclaims_info"
)
Expand All @@ -170,12 +173,13 @@ def get_metrics_persistentvolumeclaims_info():


@retry(UnexpectedBehaviour, tries=60, delay=3, backoff=1)
def check_pvcdata_collected_on_prometheus(pvc_name):
def check_pvcdata_collected_on_prometheus(pvc_name, threading_lock):
"""
Checks whether initially pvc related data is collected on pod
Args:
pvc_name (str): Name of the pvc
threading_lock (threading.RLock): A lock to prevent multiple threads calling 'oc' command at the same time
Returns:
True on success, raises UnexpectedBehaviour on failures
Expand All @@ -184,7 +188,7 @@ def check_pvcdata_collected_on_prometheus(pvc_name):
logger.info(
f"Verify for created pvc {pvc_name} related data is collected on prometheus pod"
)
pvcs_data = get_metrics_persistentvolumeclaims_info()
pvcs_data = get_metrics_persistentvolumeclaims_info(threading_lock=threading_lock)
list_pvcs_data = pvcs_data.get("data").get("result")
pvc_list = [
pvc
Expand All @@ -201,18 +205,19 @@ def check_pvcdata_collected_on_prometheus(pvc_name):
return True


def check_ceph_health_status_metrics_on_prometheus(mgr_pod):
def check_ceph_health_status_metrics_on_prometheus(mgr_pod, threading_lock):
"""
Check ceph health status metric is collected on prometheus pod
Args:
mgr_pod (str): Name of the mgr pod
threading_lock (obj): Threading lock object to ensure only one thread is making 'oc' calls
Returns:
bool: True on success, false otherwise
"""
prometheus = ocs_ci.utility.prometheus.PrometheusAPI()
prometheus = ocs_ci.utility.prometheus.PrometheusAPI(threading_lock=threading_lock)
response = prometheus.get("query?query=ceph_health_status")
ceph_health_metric = json.loads(response.content.decode("utf-8"))
return bool(
Expand Down Expand Up @@ -261,17 +266,20 @@ def prometheus_health_check(name=constants.MONITORING, kind=constants.CLUSTER_OP
return False


def check_ceph_metrics_available():
def check_ceph_metrics_available(threading_lock):
"""
Check that all healthy ceph metrics are available.
Args:
threading_lock (threading.RLock): A lock to use for thread safety 'oc' calls
Returns:
bool: True on success, false otherwise
"""
logger.info("check ceph metrics available")
# Check ceph metrics available
prometheus = ocs_ci.utility.prometheus.PrometheusAPI()
prometheus = ocs_ci.utility.prometheus.PrometheusAPI(threading_lock=threading_lock)
list_of_metrics_without_results = metrics.get_missing_metrics(
prometheus,
metrics.ceph_metrics_healthy,
Expand Down Expand Up @@ -319,15 +327,18 @@ def get_prometheus_response(api, query) -> dict:
return json.loads(resp.text)


def get_pvc_namespace_metrics():
def get_pvc_namespace_metrics(threading_lock):
"""
Get PVC and Namespace metrics from Prometheus.
Args:
threading_lock (threading.RLock): A lock to use for thread safety 'oc' calls
Returns:
dict: A dictionary containing the PVC and Namespace metrics data
"""

api = ocs_ci.utility.prometheus.PrometheusAPI()
api = ocs_ci.utility.prometheus.PrometheusAPI(threading_lock=threading_lock)

pvc_namespace = {}

Expand All @@ -354,7 +365,7 @@ def get_pvc_namespace_metrics():
return pvc_namespace


def get_ceph_capacity_metrics():
def get_ceph_capacity_metrics(threading_lock):
"""
Get CEPH capacity breakdown data from Prometheus, return all response texts collected to a dict
Use the queries from ceph-storage repo:
Expand All @@ -366,7 +377,7 @@ def get_ceph_capacity_metrics():
Returns:
dict: A dictionary containing the CEPH capacity breakdown data
"""
api = ocs_ci.utility.prometheus.PrometheusAPI()
api = ocs_ci.utility.prometheus.PrometheusAPI(threading_lock=threading_lock)

ceph_capacity = {}
logger.info("Get CEPH capacity breakdown data from Prometheus")
Expand Down
2 changes: 1 addition & 1 deletion ocs_ci/ocs/ocp.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(
field_selector (str): Selector (field query) to filter on, supports
'=', '==', and '!='. (e.g. status.phase=Running)
cluster_kubeconfig (str): Path to the cluster kubeconfig file. Useful in a multicluster configuration
threading_lock (threading.Lock): threading.Lock object that is used
threading_lock (threading.RLock): threading.RLock object that is used
for handling concurrent oc commands
silent (bool): If True will silent errors from the server, default false
skip_tls_verify (bool): Adding '--insecure-skip-tls-verify' to oc command for
Expand Down
4 changes: 3 additions & 1 deletion ocs_ci/ocs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,7 @@ def collect_prometheus_metrics(
start,
stop,
step=1.0,
threading_lock=None,
):
"""
Collects metrics from Prometheus and saves them in file in json format.
Expand All @@ -1333,8 +1334,9 @@ def collect_prometheus_metrics(
start (str): start timestamp of required datapoints
stop (str): stop timestamp of required datapoints
step (float): step of required datapoints
threading_lock: (threading.RLock): Lock to use for thread safety (default: None)
"""
api = PrometheusAPI()
api = PrometheusAPI(threading_lock=threading_lock)
log_dir_path = os.path.join(
os.path.expanduser(ocsci_config.RUN["log_dir"]),
f"failed_testcase_ocs_logs_{ocsci_config.RUN['run_id']}",
Expand Down
7 changes: 6 additions & 1 deletion ocs_ci/utility/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ocs_ci.framework import config
from ocs_ci.ocs import constants, defaults
from ocs_ci.ocs.exceptions import AlertingError, AuthError
from ocs_ci.ocs.exceptions import AlertingError, AuthError, NoThreadingLockUsedError
from ocs_ci.ocs.ocp import OCP
from ocs_ci.utility.ssl_certs import get_root_ca_cert
from ocs_ci.utility.utils import TimeoutIterator
Expand Down Expand Up @@ -330,6 +330,11 @@ def __init__(self, user=None, password=None, threading_lock=None):
Args:
user (str): OpenShift username used to connect to API
"""
if threading_lock is None:
raise NoThreadingLockUsedError(
"using threading.Lock object is mandatory for PrometheusAPI class"
)

if (
config.ENV_DATA["platform"].lower() == "ibm_cloud"
and config.ENV_DATA["deployment_type"] == "managed"
Expand Down
4 changes: 2 additions & 2 deletions ocs_ci/utility/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ def run_cmd(
timeout (int): Timeout for the command, defaults to 600 seconds.
ignore_error (bool): True if ignore non zero return code and do not
raise the exception.
threading_lock (threading.Lock): threading.Lock object that is used
threading_lock (threading.RLock): threading.RLock object that is used
for handling concurrent oc commands
silent (bool): If True will silent errors from the server, default false
Expand Down Expand Up @@ -601,7 +601,7 @@ def exec_cmd(
timeout (int): Timeout for the command, defaults to 600 seconds.
ignore_error (bool): True if ignore non zero return code and do not
raise the exception.
threading_lock (threading.Lock): threading.Lock object that is used
threading_lock (threading.RLock): threading.RLock object that is used
for handling concurrent oc commands
silent (bool): If True will silent errors from the server, default false
use_shell (bool): If True will pass the cmd without splitting
Expand Down
2 changes: 1 addition & 1 deletion ocs_ci/utility/workloadfixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def measure_operation(
and utilized data are measured after the utilization is completed
pagerduty_service_ids (list): Service IDs from PagerDuty system used
incidents query
threading_lock (threading.Lock): Lock used for synchronization of the threads in Prometheus calls
threading_lock (threading.RLock): Lock used for synchronization of the threads in Prometheus calls
Returns:
dict: contains information about `start` and `stop` time of given
Expand Down
9 changes: 6 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,10 @@ def threading_lock():
threading.Lock object that can be used in threads across multiple tests.
Returns:
threading.Lock: lock object
threading.Rlock: Reentrant Lock object. A reentrant lock (or RLock) is a type of lock that allows the same
thread to acquire the lock multiple times without causing a deadlock
"""
return threading.Lock()
return threading.RLock()


@pytest.fixture(scope="session", autouse=True)
Expand Down Expand Up @@ -1772,6 +1773,7 @@ def cluster_load(
pvc_factory_session,
service_account_factory_session,
pod_factory_session,
threading_lock,
):
"""
Run IO during the test execution
Expand Down Expand Up @@ -1805,6 +1807,7 @@ def cluster_load(
pvc_factory=pvc_factory_session,
pod_factory=pod_factory_session,
target_percentage=io_load,
threading_lock=threading_lock,
)
cl_load_obj.reach_cluster_load_percentage()
except Exception as ex:
Expand All @@ -1814,7 +1817,7 @@ def cluster_load(
if (log_utilization or io_in_bg) and not deployment_test:
if not cl_load_obj:
try:
cl_load_obj = ClusterLoad()
cl_load_obj = ClusterLoad(threading_lock=threading_lock)
except Exception as ex:
log.error(cluster_load_error_msg, ex)
cluster_load_error = ex
Expand Down
7 changes: 5 additions & 2 deletions tests/e2e/system_test/test_cluster_full_and_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def test_cluster_full_and_recovery(
pvc_factory,
pod_factory,
project_factory,
threading_lock,
):
"""
1.Create PVC1 [FS + RBD]
Expand Down Expand Up @@ -159,6 +160,7 @@ def test_cluster_full_and_recovery(
sleep=50,
func=self.verify_alerts_via_prometheus,
expected_alerts=expected_alerts,
threading_lock=threading_lock,
)
if not sample.wait_for_func_status(result=True):
log.error(f"The alerts {expected_alerts} do not exist after 600 sec")
Expand Down Expand Up @@ -317,18 +319,19 @@ def verify_osd_used_capacity_greater_than_expected(self, expected_used_capacity)
return True
return False

def verify_alerts_via_prometheus(self, expected_alerts):
def verify_alerts_via_prometheus(self, expected_alerts, threading_lock):
"""
Verify Alerts on prometheus
Args:
expected_alerts (list): list of alert names
threading_lock (threading.Rlock): Lock object to prevent simultaneous calls to 'oc'
Returns:
bool: True if expected_alerts exist, False otherwise
"""
prometheus = PrometheusAPI()
prometheus = PrometheusAPI(threading_lock=threading_lock)
log.info("Logging of all prometheus alerts started")
alerts_response = prometheus.get(
"alerts", payload={"silenced": False, "inhibited": False}
Expand Down
Loading

0 comments on commit 79f00fc

Please sign in to comment.