Skip to content

Commit

Permalink
Support monitoring api for cpu/memory utilization
Browse files Browse the repository at this point in the history
  • Loading branch information
gargnitingoogle committed Aug 21, 2024
1 parent 7037e2b commit c7bf270
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 1 deletion.
11 changes: 10 additions & 1 deletion perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,16 @@ function installDependencies() {
sudo apt install docker-ce -y
fi
# Ensure that mash is installed.
which mash || (sudo apt-get update && sudo apt-get install -y monarch-tools)
if ! which mash ; then
if ! sudo apt-get install -y monarch-tools; then
# Ensure that gcloud monitoring tools are installed. This is alternative to
# mash on gce vm.
# pip install --upgrade google-cloud-storage
# pip install --ignore-installed --upgrade google-api-python-client
# pip install --ignore-installed --upgrade google-cloud
pip install --upgrade google-cloud-monitoring
fi
fi
}

# Make sure you have access to the necessary GCP resources. The easiest way to enable it is to use <your-ldap>@google.com as active auth.
Expand Down
147 changes: 147 additions & 0 deletions perfmetrics/scripts/testing_on_gke/examples/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
# limitations under the License.

import datetime, subprocess
import math
import time
from typing import Tuple
from google.cloud import monitoring_v3


def is_mash_installed() -> bool:
Expand Down Expand Up @@ -167,3 +170,147 @@ def resource_limits(nodeType: str) -> Tuple[dict, dict]:
" resource-limits for it.",
nodeType,
)


def isRelevantMonitoringResult(
result,
cluster_name: str,
pod_name: str,
# container_name: str,
namespace_name: str,
) -> bool:
return (
True
if (
hasattr(result, "resource")
and hasattr(result.resource, "type")
and result.resource.type == "k8s_container"
and hasattr(result.resource, "labels")
and "cluster_name" in result.resource.labels
and result.resource.labels["cluster_name"] == cluster_name
and "pod_name" in result.resource.labels
and result.resource.labels["pod_name"] == pod_name
# and "container_name" in result.resource.labels
# and result.resource.labels["container_name"] == container_name
and "namespace_name" in result.resource.labels
and result.resource.labels["namespace_name"] == namespace_name
and hasattr(result, "points")
)
else False
)


def get_memory_from_monitoring_api(
project_id: str,
cluster_name: str,
pod_name: str,
# container_name: str,
namespace_name: str,
start_epoch: int,
end_epoch: int,
) -> int:
"""Returns peak memory usage of the given gke-cluster/namespace/pod/container/ scenario in MiB/s ."""
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{project_id}"

interval = monitoring_v3.TimeInterval({
"start_time": {"seconds": start_epoch, "nanos": 0},
"end_time": {"seconds": end_epoch, "nanos": 0},
})
aggregation = monitoring_v3.Aggregation({
"alignment_period": {"seconds": 600}, # 10 minutes
"per_series_aligner": monitoring_v3.Aggregation.Aligner.ALIGN_MAX,
})

results = client.list_time_series(
request={
"name": project_name,
"filter": 'metric.type = "kubernetes.io/container/memory/used_bytes"',
"interval": interval,
"view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
"aggregation": aggregation,
}
)

relevant_results = [
result
for result in results
if isRelevantMonitoringResult(
result,
cluster_name,
pod_name,
# container_name,
namespace_name,
)
]
return round(
max(
max(
(point.value.int64_value if point.value.int64_value > 0 else 0)
for point in result.points
)
for result in relevant_results
)
/ 2**20, # convert to MiB/s
0, # round to integer.
)


def get_cpu_from_monitoring_api(
project_id: str,
cluster_name: str,
pod_name: str,
# container_name: str,
namespace_name: str,
start_epoch: int,
end_epoch: int,
) -> float:
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{project_id}"

interval = monitoring_v3.TimeInterval({
"start_time": {"seconds": start_epoch, "nanos": 0},
"end_time": {"seconds": end_epoch, "nanos": 0},
})
aggregation = monitoring_v3.Aggregation({
"alignment_period": {"seconds": 600}, # 10 minutes
"per_series_aligner": monitoring_v3.Aggregation.Aligner.ALIGN_RATE,
})

results = client.list_time_series(
request={
"name": project_name,
"filter": (
'metric.type = "kubernetes.io/container/cpu/core_usage_time"'
),
"interval": interval,
"view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
"aggregation": aggregation,
}
)

relevant_results = [
result
for result in results
if isRelevantMonitoringResult(
result,
cluster_name,
pod_name,
# container_name,
namespace_name,
)
]
return round(
max(
max(
(
point.value.double_value
if point.value.double_value != math.nan
else 0
)
for point in result.points
)
for result in relevant_results
),
5, # round up to 5 decimal places.
)
52 changes: 52 additions & 0 deletions perfmetrics/scripts/testing_on_gke/examples/utils/utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""This file defines unit tests for functionalities in utils.py"""

import unittest
import utils
from utils import get_cpu_from_monitoring_api, get_memory_from_monitoring_api


class UtilsTest(unittest.TestCase):

def test_get_cpu_from_monitoring_api(self):
project_id = "gcs-fuse-test"
cluster_name = "gargnitin-dryrun-us-west1-6"
pod_name = "fio-tester-gcsfuse-rr-64k-1670041227260535313"
# container_name = "fio-tester"
namespace_name = "default"
start_epoch = 1724233283
end_epoch = 1724233442
print(
get_cpu_from_monitoring_api(
project_id,
cluster_name,
pod_name,
# container_name,
namespace_name,
start_epoch,
end_epoch,
)
)

def test_get_memory_from_monitoring_api(self):
project_id = "gcs-fuse-test"
cluster_name = "gargnitin-dryrun-us-west1-6"
pod_name = "fio-tester-gcsfuse-rr-64k-1670041227260535313"
# container_name = "fio-tester"
namespace_name = "default"
start_epoch = 1724233283
end_epoch = 1724233442
print(
get_memory_from_monitoring_api(
project_id,
cluster_name,
pod_name,
# container_name,
namespace_name,
start_epoch,
end_epoch,
)
)


if __name__ == "__main__":
unittest.main()

0 comments on commit c7bf270

Please sign in to comment.