From 0001083089144d9efd9a11f5d8c673536941fa41 Mon Sep 17 00:00:00 2001 From: Matheus Tosta Date: Wed, 27 Nov 2024 10:47:35 -0400 Subject: [PATCH 01/13] PENG-2457 implement logic to retrieve Slurm jobs metrics data from InfluxDB and send it to the API --- jobbergate-agent/CHANGELOG.md | 2 + .../jobbergate_agent/clients/influx.py | 27 + .../jobbergate_agent/jobbergate/constants.py | 16 + .../jobbergate_agent/jobbergate/schemas.py | 46 +- .../jobbergate_agent/jobbergate/update.py | 150 ++++- jobbergate-agent/jobbergate_agent/settings.py | 43 +- jobbergate-agent/poetry.lock | 96 ++- jobbergate-agent/pyproject.toml | 4 +- jobbergate-agent/tests/conftest.py | 3 +- .../tests/jobbergate/test_constants.py | 10 + .../tests/jobbergate/test_update.py | 579 +++++++++++++++++- .../tests/test_influxdb_client.py | 70 +++ jobbergate-agent/tests/test_settings.py | 36 ++ 13 files changed, 1060 insertions(+), 22 deletions(-) create mode 100644 jobbergate-agent/jobbergate_agent/clients/influx.py create mode 100644 jobbergate-agent/tests/jobbergate/test_constants.py create mode 100644 jobbergate-agent/tests/test_influxdb_client.py diff --git a/jobbergate-agent/CHANGELOG.md b/jobbergate-agent/CHANGELOG.md index 323b9206d..05f717a33 100644 --- a/jobbergate-agent/CHANGELOG.md +++ b/jobbergate-agent/CHANGELOG.md @@ -4,6 +4,8 @@ This file keeps track of all notable changes to jobbergate-agent ## Unreleased +- Implement logic to retrieve job metrics data from InfluxDB and send it to the API. ([PENG-2457](https://sharing.clickup.com/t/h/c/18022949/PENG-2457/BU7UOA63B936N27)) + ## 5.4.0 -- 2024-11-18 - Changed auto-update task to reuse current scheduler instead of creating a new one diff --git a/jobbergate-agent/jobbergate_agent/clients/influx.py b/jobbergate-agent/jobbergate_agent/clients/influx.py new file mode 100644 index 000000000..c3eb1411a --- /dev/null +++ b/jobbergate-agent/jobbergate_agent/clients/influx.py @@ -0,0 +1,27 @@ +"""Core module for defining the InfluxDB client.""" + +from influxdb import InfluxDBClient +from loguru import logger + +from jobbergate_agent.settings import SETTINGS + + +def initialize_influx_client() -> None | InfluxDBClient: + """Initialize the InfluxDB client.""" + if SETTINGS.INFLUX_INTEGRATION_ENABLED: + logger.debug("InfluxDB integration is enabled. Initializing InfluxDB client...") + return InfluxDBClient.from_dsn( + str(SETTINGS.INFLUX_DSN), + pool_size=SETTINGS.INFLUX_POOL_SIZE, + ssl=SETTINGS.INFLUX_SSL, + verify_ssl=SETTINGS.INFLUX_VERIFY_SSL, + timeout=SETTINGS.INFLUX_TIMEOUT, + udp_port=SETTINGS.INFLUX_UDP_PORT, + cert=SETTINGS.INFLUX_CERT_PATH, + ) + else: + logger.debug("InfluxDB integration is disabled") + return None + + +INFLUXDB_CLIENT = initialize_influx_client() diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/constants.py b/jobbergate-agent/jobbergate_agent/jobbergate/constants.py index d67cba10b..ee17cd0e3 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/constants.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/constants.py @@ -1,3 +1,5 @@ +from typing import Literal + from auto_name_enum import AutoNameEnum, auto @@ -6,3 +8,17 @@ class FileType(AutoNameEnum): ENTRYPOINT = auto() SUPPORT = auto() + + +INFLUXDB_MEASUREMENT = Literal[ + "CPUFrequency", + "CPUTime", + "CPUUtilization", + "GPUMemMB", + "GPUUtilization", + "Pages", + "RSS", + "ReadMB", + "VMSize", + "WriteMB", +] diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py b/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py index 5f1a25e3d..d85bec017 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py @@ -1,10 +1,10 @@ from pathlib import Path -from typing import List, Optional +from typing import List, Optional, TypedDict import pydantic from pydantic import ConfigDict -from jobbergate_agent.jobbergate.constants import FileType +from jobbergate_agent.jobbergate.constants import FileType, INFLUXDB_MEASUREMENT class JobScriptFile(pydantic.BaseModel, extra="ignore"): @@ -78,3 +78,45 @@ class SlurmJobData(pydantic.BaseModel, extra="ignore"): job_state: Optional[str] = None job_info: Optional[str] = None state_reason: Optional[str] = None + + +class InfluxDBMeasurement(TypedDict): + """ + Map each entry in the list returned by `InfluxDBClient(...).get_list_measurements(...)`. + """ + + name: INFLUXDB_MEASUREMENT + + +class InfluxDBMeasure(TypedDict): + """ + Map each entry in the generator returned by InfluxDBClient(...).query(...).get_points(). + """ + + time: int + host: str + job: str + step: str + task: str + value: float + measurement: INFLUXDB_MEASUREMENT + + +class JobSubmissionMetricsMaxTimes(pydantic.BaseModel): + """ + Model for the max_times field of the JobSubmissionMetricsMaxResponse. + """ + + max_time: int + node_host: str + step: int + task: int + + +class JobSubmissionMetricsMaxResponse(pydantic.BaseModel): + """ + Model for the response of the `/jobbergate/job-submissions/agent/metrics/{job_submission_id}` endpoint. + """ + + job_submission_id: int + max_times: list[JobSubmissionMetricsMaxTimes] diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/update.py b/jobbergate-agent/jobbergate_agent/jobbergate/update.py index e3d2894bb..49c69f115 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/update.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/update.py @@ -1,14 +1,26 @@ +import asyncio import json -from typing import List +from itertools import chain +from typing import List, get_args, cast +from collections.abc import Iterator +import msgpack from jobbergate_core.tools.sbatch import InfoHandler from loguru import logger from jobbergate_agent.clients.cluster_api import backend_client as jobbergate_api_client -from jobbergate_agent.jobbergate.schemas import ActiveJobSubmission, SlurmJobData +from jobbergate_agent.clients.influx import INFLUXDB_CLIENT +from jobbergate_agent.jobbergate.schemas import ( + ActiveJobSubmission, + SlurmJobData, + JobSubmissionMetricsMaxResponse, + InfluxDBMeasurement, + InfluxDBMeasure, +) from jobbergate_agent.settings import SETTINGS from jobbergate_agent.utils.exception import JobbergateApiError, SbatchError from jobbergate_agent.utils.logging import log_error +from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT async def fetch_job_data(slurm_job_id: int, info_handler: InfoHandler) -> SlurmJobData: @@ -70,6 +82,132 @@ async def update_job_data( response.raise_for_status() +async def fetch_influx_data( + time: int, host: str, step: int, task: int, job: int, measurement: INFLUXDB_MEASUREMENT +) -> list[InfluxDBMeasure]: + """ + Fetch data from InfluxDB for a given host, step and task. + """ + query = f""" + SELECT * FROM {measurement} WHERE time > $time AND host = $host AND step = $step AND task = $task AND job = $job + """ + with JobbergateApiError.handle_errors("Failed to fetch data from InfluxDB", do_except=log_error): + assert INFLUXDB_CLIENT is not None + params = dict(time=time, host=host, step=str(step), task=str(task), job=str(job)) + logger.debug(f"Querying InfluxDB with: {query=}, {params=}") + result = INFLUXDB_CLIENT.query(query, bind_params=params, epoch="us") + logger.debug("Successfully fetched data from InfluxDB") + return [ + InfluxDBMeasure( + time=point["time"], + host=point["host"], + job=point["job"], + step=point["step"], + task=point["task"], + value=point["value"], + measurement=measurement, + ) + for point in result.get_points() + ] + + +def fetch_influx_measurements() -> list[InfluxDBMeasurement]: + """ + Fetch measurements from InfluxDB. + """ + with JobbergateApiError.handle_errors("Failed to fetch measurements from InfluxDB", do_except=log_error): + logger.debug("Fetching measurements from InfluxDB") + assert INFLUXDB_CLIENT is not None + measurements: list[InfluxDBMeasurement] = INFLUXDB_CLIENT.get_list_measurements() + logger.debug(f"Fetched measurements from InfluxDB: {measurements=}") + return measurements + + +def aggregate_influx_measures( + data_points: Iterator[InfluxDBMeasure], +) -> list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]]: + """Aggregate the list of data points by time, host, step and task. + + The output data is a list of tuples with the following format: + [ + (time, host, step, task, CPUFrequency, CPUTime, CPUUtilization, GPUMemMB, + GPUUtilization, Pages, RSS, VMSize, ReadMB, WriteMB), + ... + ] + """ + measurement_names = get_args(INFLUXDB_MEASUREMENT) + default_measurements: dict[str, float] = {measurement: 0.0 for measurement in measurement_names} + + aggregated_data: dict[tuple[int, str, str, str], dict[str, float]] = {} + + for measure in data_points: + key = (measure["time"], measure["host"], measure["step"], measure["task"]) + + # aggregate measurements lazily to avoid creating a new dict for each point + if key not in aggregated_data: + aggregated_data[key] = default_measurements.copy() + aggregated_data[key][measure["measurement"]] = measure["value"] + + return cast( + list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + [ + ( + time, + host, + step, + task, + *(aggregated_data[(time, host, step, task)][measurement] for measurement in measurement_names), + ) + for (time, host, step, task) in aggregated_data + ], + ) + + +async def update_job_metrics(active_job_submittion: ActiveJobSubmission) -> None: + """Update job metrics for a job submission. + + This function fetches the metrics from InfluxDB and sends to the API. + """ + with JobbergateApiError.handle_errors( + f"Could not update job metrics for slurm job {active_job_submittion.slurm_job_id} via the API", + do_except=log_error, + ): + response = await jobbergate_api_client.get( + f"jobbergate/job-submissions/agent/metrics/{active_job_submittion.id}" + ) + response.raise_for_status() + job_max_times = JobSubmissionMetricsMaxResponse(**response.json()) + + influx_measurements = fetch_influx_measurements() + + tasks = ( + fetch_influx_data( + job_max_time.max_time, + job_max_time.node_host, + job_max_time.step, + job_max_time.task, + active_job_submittion.slurm_job_id, + measurement["name"], + ) + for job_max_time in job_max_times.max_times + for measurement in influx_measurements + ) + results = await asyncio.gather(*list(tasks)) + data_points = chain.from_iterable(results) + aggregated_data_points = aggregate_influx_measures(data_points) + packed_data = msgpack.packb(aggregated_data_points) + + request_payload = { + "slurm_job_id": active_job_submittion.slurm_job_id, + "binary_data": packed_data, + } + response = await jobbergate_api_client.put( + f"jobbergate/job-submissions/agent/metrics/{active_job_submittion.id}", + data=request_payload, + ) + response.raise_for_status() + + async def update_active_jobs() -> None: """ Update slurm job state for active jobs. @@ -83,6 +221,14 @@ async def update_active_jobs() -> None: skip = "skipping to next active job" for active_job_submission in active_job_submissions: + if SETTINGS.INFLUX_INTEGRATION_ENABLED: + logger.debug(f"Updating job metrics for job_submission {active_job_submission.id}") + try: + await update_job_metrics(active_job_submission) + except Exception: + logger.error("Update job metrics failed... skipping for job data update") + pass + logger.debug(f"Fetching slurm job state of job_submission {active_job_submission.id}") try: diff --git a/jobbergate-agent/jobbergate_agent/settings.py b/jobbergate-agent/jobbergate_agent/settings.py index 1bcb21688..5eb72f256 100644 --- a/jobbergate-agent/jobbergate_agent/settings.py +++ b/jobbergate-agent/jobbergate_agent/settings.py @@ -3,7 +3,7 @@ from typing import Optional import buzz -from pydantic import AnyHttpUrl, Field, ValidationError, model_validator +from pydantic import AnyHttpUrl, Field, ValidationError, model_validator, field_validator, AnyUrl from pydantic_settings import BaseSettings, SettingsConfigDict from typing_extensions import Self @@ -60,6 +60,21 @@ class Settings(BaseSettings): # Job submission settings WRITE_SUBMISSION_FILES: bool = True + # InfluxDB settings for job metric collection + INFLUX_DSN: Optional[AnyUrl] = Field( + None, description="InfluxDB DSN. Only supports the schemes 'influxdb', 'https+influxdb' and 'udp+influxdb'" + ) + INFLUX_POOL_SIZE: int = Field(10, ge=1, description="Number of InfluxDB connections to pool") + INFLUX_SSL: bool = Field(False, description="Use SSL for InfluxDB connection") + INFLUX_VERIFY_SSL: bool = Field(False, description="Verify SSL certificate for InfluxDB connection") + INFLUX_TIMEOUT: Optional[int] = Field(None, ge=1, description="Timeout for InfluxDB connection") + INFLUX_UDP_PORT: int = Field(4444, ge=1, le=65535, description="UDP port for InfluxDB connection") + INFLUX_CERT_PATH: Optional[Path] = Field(None, description="Path to InfluxDB certificate file") + INFLUX_INTEGRATION_ENABLED: bool = Field( + False, + description="Control parameter for indicating if InfluxDB integration is enabled. It shouldn't be manually configured.", + ) + @model_validator(mode="after") def compute_extra_settings(self) -> Self: """ @@ -81,6 +96,32 @@ def compute_extra_settings(self) -> Self: self.SINGLE_USER_SUBMITTER = self.X_SLURM_USER_NAME return self + @model_validator(mode="after") + def validate_influxdb_settings(self) -> Self: + if self.INFLUX_DSN is not None: + self.INFLUX_INTEGRATION_ENABLED = True + + if self.INFLUX_SSL: + buzz.require_condition( + self.INFLUX_CERT_PATH is not None, + "INFLUX_CERT_PATH must be provided when INFLUX_SSL is enabled", + ValueError, + ) + + if self.INFLUX_DSN.scheme not in ["influxdb", "https+influxdb", "udp+influxdb"]: + raise ValueError("INFLUX_DSN scheme must be one of 'influxdb', 'https+influxdb' or 'udp+influxdb'") + return self + + @field_validator("INFLUX_INTEGRATION_ENABLED", mode="before") + @classmethod + def validate_influxdb_integration_enabled_value(cls, value: bool): + buzz.require_condition( + not value, + "The INFLUX_INTEGRATION_ENABLED configuration should not be manually configured", + ValueError, + ) + return value + model_config = SettingsConfigDict(env_prefix="JOBBERGATE_AGENT_", env_file=_get_env_file(), extra="ignore") diff --git a/jobbergate-agent/poetry.lock b/jobbergate-agent/poetry.lock index c642363a1..d34465f26 100644 --- a/jobbergate-agent/poetry.lock +++ b/jobbergate-agent/poetry.lock @@ -383,6 +383,27 @@ files = [ {file = "idna-3.7.tar.gz", hash = "sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc"}, ] +[[package]] +name = "influxdb" +version = "5.3.2" +description = "InfluxDB client" +optional = false +python-versions = "*" +files = [ + {file = "influxdb-5.3.2-py2.py3-none-any.whl", hash = "sha256:00d86b18a968d011b2eee39ec3b2ae941b1dcf7086bc7211e675914623caffcd"}, + {file = "influxdb-5.3.2.tar.gz", hash = "sha256:58c647f6043712dd86e9aee12eb4ccfbbb5415467bc9910a48aa8c74c1108970"}, +] + +[package.dependencies] +msgpack = "*" +python-dateutil = ">=2.6.0" +pytz = "*" +requests = ">=2.17.0" +six = ">=1.10.0" + +[package.extras] +test = ["mock", "nose", "nose-cov", "requests-mock"] + [[package]] name = "iniconfig" version = "2.0.0" @@ -432,6 +453,79 @@ win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""} [package.extras] dev = ["Sphinx (>=4.1.1)", "black (>=19.10b0)", "colorama (>=0.3.4)", "docutils (==0.16)", "flake8 (>=3.7.7)", "isort (>=5.1.1)", "pytest (>=4.6.2)", "pytest-cov (>=2.7.1)", "sphinx-autobuild (>=0.7.1)", "sphinx-rtd-theme (>=0.4.3)", "tox (>=3.9.0)"] +[[package]] +name = "msgpack" +version = "1.1.0" +description = "MessagePack serializer" +optional = false +python-versions = ">=3.8" +files = [ + {file = "msgpack-1.1.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:7ad442d527a7e358a469faf43fda45aaf4ac3249c8310a82f0ccff9164e5dccd"}, + {file = "msgpack-1.1.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:74bed8f63f8f14d75eec75cf3d04ad581da6b914001b474a5d3cd3372c8cc27d"}, + {file = "msgpack-1.1.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:914571a2a5b4e7606997e169f64ce53a8b1e06f2cf2c3a7273aa106236d43dd5"}, + {file = "msgpack-1.1.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c921af52214dcbb75e6bdf6a661b23c3e6417f00c603dd2070bccb5c3ef499f5"}, + {file = "msgpack-1.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d8ce0b22b890be5d252de90d0e0d119f363012027cf256185fc3d474c44b1b9e"}, + {file = "msgpack-1.1.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:73322a6cc57fcee3c0c57c4463d828e9428275fb85a27aa2aa1a92fdc42afd7b"}, + {file = "msgpack-1.1.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:e1f3c3d21f7cf67bcf2da8e494d30a75e4cf60041d98b3f79875afb5b96f3a3f"}, + {file = "msgpack-1.1.0-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:64fc9068d701233effd61b19efb1485587560b66fe57b3e50d29c5d78e7fef68"}, + {file = "msgpack-1.1.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:42f754515e0f683f9c79210a5d1cad631ec3d06cea5172214d2176a42e67e19b"}, + {file = "msgpack-1.1.0-cp310-cp310-win32.whl", hash = "sha256:3df7e6b05571b3814361e8464f9304c42d2196808e0119f55d0d3e62cd5ea044"}, + {file = "msgpack-1.1.0-cp310-cp310-win_amd64.whl", hash = "sha256:685ec345eefc757a7c8af44a3032734a739f8c45d1b0ac45efc5d8977aa4720f"}, + {file = "msgpack-1.1.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:3d364a55082fb2a7416f6c63ae383fbd903adb5a6cf78c5b96cc6316dc1cedc7"}, + {file = "msgpack-1.1.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:79ec007767b9b56860e0372085f8504db5d06bd6a327a335449508bbee9648fa"}, + {file = "msgpack-1.1.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:6ad622bf7756d5a497d5b6836e7fc3752e2dd6f4c648e24b1803f6048596f701"}, + {file = "msgpack-1.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8e59bca908d9ca0de3dc8684f21ebf9a690fe47b6be93236eb40b99af28b6ea6"}, + {file = "msgpack-1.1.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5e1da8f11a3dd397f0a32c76165cf0c4eb95b31013a94f6ecc0b280c05c91b59"}, + {file = "msgpack-1.1.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:452aff037287acb1d70a804ffd022b21fa2bb7c46bee884dbc864cc9024128a0"}, + {file = "msgpack-1.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:8da4bf6d54ceed70e8861f833f83ce0814a2b72102e890cbdfe4b34764cdd66e"}, + {file = "msgpack-1.1.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:41c991beebf175faf352fb940bf2af9ad1fb77fd25f38d9142053914947cdbf6"}, + {file = "msgpack-1.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:a52a1f3a5af7ba1c9ace055b659189f6c669cf3657095b50f9602af3a3ba0fe5"}, + {file = "msgpack-1.1.0-cp311-cp311-win32.whl", hash = "sha256:58638690ebd0a06427c5fe1a227bb6b8b9fdc2bd07701bec13c2335c82131a88"}, + {file = "msgpack-1.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd2906780f25c8ed5d7b323379f6138524ba793428db5d0e9d226d3fa6aa1788"}, + {file = "msgpack-1.1.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:d46cf9e3705ea9485687aa4001a76e44748b609d260af21c4ceea7f2212a501d"}, + {file = "msgpack-1.1.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:5dbad74103df937e1325cc4bfeaf57713be0b4f15e1c2da43ccdd836393e2ea2"}, + {file = "msgpack-1.1.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:58dfc47f8b102da61e8949708b3eafc3504509a5728f8b4ddef84bd9e16ad420"}, + {file = "msgpack-1.1.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4676e5be1b472909b2ee6356ff425ebedf5142427842aa06b4dfd5117d1ca8a2"}, + {file = "msgpack-1.1.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:17fb65dd0bec285907f68b15734a993ad3fc94332b5bb21b0435846228de1f39"}, + {file = "msgpack-1.1.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a51abd48c6d8ac89e0cfd4fe177c61481aca2d5e7ba42044fd218cfd8ea9899f"}, + {file = "msgpack-1.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2137773500afa5494a61b1208619e3871f75f27b03bcfca7b3a7023284140247"}, + {file = "msgpack-1.1.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:398b713459fea610861c8a7b62a6fec1882759f308ae0795b5413ff6a160cf3c"}, + {file = "msgpack-1.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:06f5fd2f6bb2a7914922d935d3b8bb4a7fff3a9a91cfce6d06c13bc42bec975b"}, + {file = "msgpack-1.1.0-cp312-cp312-win32.whl", hash = "sha256:ad33e8400e4ec17ba782f7b9cf868977d867ed784a1f5f2ab46e7ba53b6e1e1b"}, + {file = "msgpack-1.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:115a7af8ee9e8cddc10f87636767857e7e3717b7a2e97379dc2054712693e90f"}, + {file = "msgpack-1.1.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:071603e2f0771c45ad9bc65719291c568d4edf120b44eb36324dcb02a13bfddf"}, + {file = "msgpack-1.1.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:0f92a83b84e7c0749e3f12821949d79485971f087604178026085f60ce109330"}, + {file = "msgpack-1.1.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:4a1964df7b81285d00a84da4e70cb1383f2e665e0f1f2a7027e683956d04b734"}, + {file = "msgpack-1.1.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:59caf6a4ed0d164055ccff8fe31eddc0ebc07cf7326a2aaa0dbf7a4001cd823e"}, + {file = "msgpack-1.1.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0907e1a7119b337971a689153665764adc34e89175f9a34793307d9def08e6ca"}, + {file = "msgpack-1.1.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:65553c9b6da8166e819a6aa90ad15288599b340f91d18f60b2061f402b9a4915"}, + {file = "msgpack-1.1.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:7a946a8992941fea80ed4beae6bff74ffd7ee129a90b4dd5cf9c476a30e9708d"}, + {file = "msgpack-1.1.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:4b51405e36e075193bc051315dbf29168d6141ae2500ba8cd80a522964e31434"}, + {file = "msgpack-1.1.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:b4c01941fd2ff87c2a934ee6055bda4ed353a7846b8d4f341c428109e9fcde8c"}, + {file = "msgpack-1.1.0-cp313-cp313-win32.whl", hash = "sha256:7c9a35ce2c2573bada929e0b7b3576de647b0defbd25f5139dcdaba0ae35a4cc"}, + {file = "msgpack-1.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:bce7d9e614a04d0883af0b3d4d501171fbfca038f12c77fa838d9f198147a23f"}, + {file = "msgpack-1.1.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c40ffa9a15d74e05ba1fe2681ea33b9caffd886675412612d93ab17b58ea2fec"}, + {file = "msgpack-1.1.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1ba6136e650898082d9d5a5217d5906d1e138024f836ff48691784bbe1adf96"}, + {file = "msgpack-1.1.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e0856a2b7e8dcb874be44fea031d22e5b3a19121be92a1e098f46068a11b0870"}, + {file = "msgpack-1.1.0-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:471e27a5787a2e3f974ba023f9e265a8c7cfd373632247deb225617e3100a3c7"}, + {file = "msgpack-1.1.0-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:646afc8102935a388ffc3914b336d22d1c2d6209c773f3eb5dd4d6d3b6f8c1cb"}, + {file = "msgpack-1.1.0-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:13599f8829cfbe0158f6456374e9eea9f44eee08076291771d8ae93eda56607f"}, + {file = "msgpack-1.1.0-cp38-cp38-win32.whl", hash = "sha256:8a84efb768fb968381e525eeeb3d92857e4985aacc39f3c47ffd00eb4509315b"}, + {file = "msgpack-1.1.0-cp38-cp38-win_amd64.whl", hash = "sha256:879a7b7b0ad82481c52d3c7eb99bf6f0645dbdec5134a4bddbd16f3506947feb"}, + {file = "msgpack-1.1.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:53258eeb7a80fc46f62fd59c876957a2d0e15e6449a9e71842b6d24419d88ca1"}, + {file = "msgpack-1.1.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:7e7b853bbc44fb03fbdba34feb4bd414322180135e2cb5164f20ce1c9795ee48"}, + {file = "msgpack-1.1.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f3e9b4936df53b970513eac1758f3882c88658a220b58dcc1e39606dccaaf01c"}, + {file = "msgpack-1.1.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:46c34e99110762a76e3911fc923222472c9d681f1094096ac4102c18319e6468"}, + {file = "msgpack-1.1.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8a706d1e74dd3dea05cb54580d9bd8b2880e9264856ce5068027eed09680aa74"}, + {file = "msgpack-1.1.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:534480ee5690ab3cbed89d4c8971a5c631b69a8c0883ecfea96c19118510c846"}, + {file = "msgpack-1.1.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:8cf9e8c3a2153934a23ac160cc4cba0ec035f6867c8013cc6077a79823370346"}, + {file = "msgpack-1.1.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:3180065ec2abbe13a4ad37688b61b99d7f9e012a535b930e0e683ad6bc30155b"}, + {file = "msgpack-1.1.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:c5a91481a3cc573ac8c0d9aace09345d989dc4a0202b7fcb312c88c26d4e71a8"}, + {file = "msgpack-1.1.0-cp39-cp39-win32.whl", hash = "sha256:f80bc7d47f76089633763f952e67f8214cb7b3ee6bfa489b3cb6a84cfac114cd"}, + {file = "msgpack-1.1.0-cp39-cp39-win_amd64.whl", hash = "sha256:4d1b7ff2d6146e16e8bd665ac726a89c74163ef8cd39fa8c1087d4e52d3a2325"}, + {file = "msgpack-1.1.0.tar.gz", hash = "sha256:dd432ccc2c72b914e4cb77afce64aab761c1137cc698be3984eee260bcb2896e"}, +] + [[package]] name = "mypy" version = "1.10.0" @@ -1190,4 +1284,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "88df49be47c3f609fe1c454bbcd2ea851a07ac84fc93c91339369c2e26b1237e" +content-hash = "d1ddbb21b3fb9146a0207b9b66fe575dd90562ab2980d2d7caa7b3099d445909" diff --git a/jobbergate-agent/pyproject.toml b/jobbergate-agent/pyproject.toml index 7944e2b9e..fe34a3490 100644 --- a/jobbergate-agent/pyproject.toml +++ b/jobbergate-agent/pyproject.toml @@ -34,6 +34,8 @@ python-jose = "3.3.0" sentry-sdk = "^1.1.0" auto-name-enum = "^2.0.0" pydantic-settings = "^2.3.3" +msgpack = "^1.1.0" +influxdb = "^5.3.2" [tool.stickywheel] # This will resolve the relative path to the jobbergate-core package at build time @@ -95,7 +97,7 @@ show_missing = true plugins = "pydantic.mypy" [[tool.mypy.overrides]] -module = ["dataclasses", "toml", "requests", "jose.*", "apscheduler.*"] +module = ["dataclasses", "toml", "requests", "jose.*", "apscheduler.*", "influxdb", "msgpack"] ignore_missing_imports = true [build-system] diff --git a/jobbergate-agent/tests/conftest.py b/jobbergate-agent/tests/conftest.py index a2b151e22..08dcc7746 100644 --- a/jobbergate-agent/tests/conftest.py +++ b/jobbergate-agent/tests/conftest.py @@ -2,6 +2,7 @@ import random import string from datetime import datetime, timezone +from typing import Callable from unittest import mock import httpx @@ -49,7 +50,7 @@ def caplog(caplog): @pytest.fixture -def tweak_settings(): +def tweak_settings() -> Callable[..., contextlib._GeneratorContextManager]: """ Provides a fixture to use as a context manager where the project settings may be temporarily changed. diff --git a/jobbergate-agent/tests/jobbergate/test_constants.py b/jobbergate-agent/tests/jobbergate/test_constants.py new file mode 100644 index 000000000..726c0e124 --- /dev/null +++ b/jobbergate-agent/tests/jobbergate/test_constants.py @@ -0,0 +1,10 @@ +"""Core module for verifying the constants used in the application.""" + +from typing import get_args + +from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT + + +def test_influxdb_measurement_sorting(): + """Check if the measurements are sorted in ascending order.""" + assert list(get_args(INFLUXDB_MEASUREMENT)) == sorted(get_args(INFLUXDB_MEASUREMENT)) diff --git a/jobbergate-agent/tests/jobbergate/test_update.py b/jobbergate-agent/tests/jobbergate/test_update.py index 92c235897..4d4c5cf9b 100644 --- a/jobbergate-agent/tests/jobbergate/test_update.py +++ b/jobbergate-agent/tests/jobbergate/test_update.py @@ -1,21 +1,126 @@ import json +import random +import time +from datetime import datetime +from typing import get_args, cast from unittest import mock +from collections.abc import Callable, Generator, Iterator +import contextlib import httpx import pytest import respx -from jobbergate_agent.jobbergate.schemas import ActiveJobSubmission, SlurmJobData +from jobbergate_agent.jobbergate.schemas import ActiveJobSubmission, SlurmJobData, InfluxDBMeasure from jobbergate_agent.jobbergate.update import ( fetch_active_submissions, fetch_job_data, update_active_jobs, update_job_data, + fetch_influx_data, + fetch_influx_measurements, + aggregate_influx_measures, + update_job_metrics, ) +from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT from jobbergate_agent.settings import SETTINGS from jobbergate_agent.utils.exception import JobbergateApiError +@pytest.fixture() +def generate_job_metrics_data() -> Callable[[int, int, int, int, int], Generator[InfluxDBMeasure]]: + """Generates sample InfluxDB data with multiple points per measurement.""" + + def _generate_influxdb_data( + num_points_per_measurement: int, num_hosts: int, num_jobs: int, num_steps: int, num_tasks: int + ) -> Generator[InfluxDBMeasure]: + current_time = int(datetime.now().timestamp()) + + for host in range(1, num_hosts + 1): + for job in range(1, num_jobs + 1): + for step in range(1, num_steps + 1): + for task in range(1, num_tasks + 1): + for measurement in get_args(INFLUXDB_MEASUREMENT): + for _ in range(num_points_per_measurement): + yield { + "time": current_time, + "host": f"host_{host}", + "job": str(job), + "step": str(step), + "task": str(task), + "value": random.random() * 100, + "measurement": measurement, + } + current_time += random.randint(30, 60) # increment time by a random interval + + return _generate_influxdb_data + + +@pytest.fixture() +def aggregate_job_metrics_data(): + """Generates aggregated InfluxDB data with multiple points per measurement.""" + + def _aggregate_influxdb_data( + data_points: Iterator[InfluxDBMeasure], + ) -> list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]]: + measurement_names = get_args(INFLUXDB_MEASUREMENT) + default_measurements: dict[str, float] = {measurement: 0.0 for measurement in measurement_names} + + aggregated_data: dict[tuple[int, str, str, str], dict[str, float]] = {} + + for measure in data_points: + key = (measure["time"], measure["host"], measure["step"], measure["task"]) + + # aggregate measurements lazily to avoid creating a new dict for each point + if key not in aggregated_data: + aggregated_data[key] = default_measurements.copy() + aggregated_data[key][measure["measurement"]] = measure["value"] + + return cast( + list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + [ + ( + time, + host, + step, + task, + *(aggregated_data[(time, host, step, task)][measurement] for measurement in measurement_names), + ) + for (time, host, step, task) in aggregated_data + ], + ) + + return _aggregate_influxdb_data + + +@pytest.fixture() +def job_max_times_response() -> Callable[[int, int, int, int], dict[str, int | list[dict[str, int | str]]]]: + """Generates a sample response for the endpoint + ``jobbergate/job-submissions/agent/metrics/``. + """ + + def _job_max_times_response( + job_submission_id: int, num_hosts: int, num_steps: int, num_tasks: int + ) -> dict[str, int | list[dict[str, int | str]]]: + current_time = int(datetime.now().timestamp()) + return { + "job_submission_id": job_submission_id, + "max_times": [ + { + "max_time": current_time, + "node_host": f"host_{host}", + "step": step, + "task": task, + } + for host in range(1, num_hosts + 1) + for step in range(1, num_steps + 1) + for task in range(1, num_tasks + 1) + ], + } + + return _job_max_times_response + + @pytest.mark.asyncio @pytest.mark.usefixtures("mock_access_token") async def test_fetch_job_data__success(): @@ -194,7 +299,10 @@ async def test_update_job_data__raises_JobbergateApiError_if_the_response_is_not @pytest.mark.asyncio @pytest.mark.usefixtures("mock_access_token") -async def test_update_active_jobs(mocker): +async def test_update_active_jobs( + mocker, + tweak_settings: Callable[..., contextlib._GeneratorContextManager], +): """ Test that the ``update_active_jobs()`` function can fetch active job submissions, retrieve the job data from slurm, and update the slurm job data on the submission via the API. @@ -203,15 +311,24 @@ async def test_update_active_jobs(mocker): mocked_sbatch = mock.MagicMock() mocker.patch("jobbergate_agent.jobbergate.update.InfoHandler", return_value=mocked_sbatch) + mocked_influxdb_client = mock.MagicMock() + mocker.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT", return_value=mocked_influxdb_client) + + active_job_submissions = [ + ActiveJobSubmission(id=1, slurm_job_id=11), # Will update + ActiveJobSubmission(id=2, slurm_job_id=22), # fetch_job_data throws exception + ActiveJobSubmission(id=3, slurm_job_id=33), # update_job_data throws exception + ] + mocker.patch( "jobbergate_agent.jobbergate.update.fetch_active_submissions", - return_value=[ - ActiveJobSubmission(id=1, slurm_job_id=11), # Will update - ActiveJobSubmission(id=2, slurm_job_id=22), # fetch_job_data throws exception - ActiveJobSubmission(id=3, slurm_job_id=33), # update_job_data throws exception - ], + return_value=active_job_submissions, ) + def _mocked_update_job_metrics(active_job_submission: ActiveJobSubmission): + if active_job_submission.slurm_job_id == 11: + raise Exception("CRASH!") + def _mocked_fetch_job_data(slurm_job_id, *args, **kwargs): if slurm_job_id == 22: raise Exception("BOOM!") @@ -234,19 +351,30 @@ def _mocked_update_job_data(job_submission_id, slurm_job_data): if job_submission_id == 3: raise Exception("BANG!") - mock_fetch = mocker.patch("jobbergate_agent.jobbergate.update.fetch_job_data", side_effect=_mocked_fetch_job_data) - mock_update = mocker.patch( + mock_update_job_metrics = mocker.patch( + "jobbergate_agent.jobbergate.update.update_job_metrics", side_effect=_mocked_update_job_metrics + ) + mock_fetch_job_data = mocker.patch( + "jobbergate_agent.jobbergate.update.fetch_job_data", side_effect=_mocked_fetch_job_data + ) + mock_update_job_data = mocker.patch( "jobbergate_agent.jobbergate.update.update_job_data", side_effect=_mocked_update_job_data ) - await update_active_jobs() + with tweak_settings(INFLUX_INTEGRATION_ENABLED=True): + await update_active_jobs() + + mock_update_job_metrics.assert_has_calls( + [mocker.call(active_job_submission) for active_job_submission in active_job_submissions] + ) + assert mock_update_job_metrics.call_count == 3 - mock_fetch.assert_has_calls( + mock_fetch_job_data.assert_has_calls( [mocker.call(11, mocked_sbatch), mocker.call(22, mocked_sbatch), mocker.call(33, mocked_sbatch)] ) - assert mock_fetch.call_count == 3 + assert mock_fetch_job_data.call_count == 3 - mock_update.assert_has_calls( + mock_update_job_data.assert_has_calls( [ mocker.call( 1, @@ -268,4 +396,427 @@ def _mocked_update_job_data(job_submission_id, slurm_job_data): ), ] ) - assert mock_update.call_count == 2 + assert mock_update_job_data.call_count == 2 + + +@pytest.mark.asyncio +@mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT") +async def test_fetch_influx_data__success(mocked_influxdb_client: mock.MagicMock): + """ + Test that the ``fetch_influx_data()`` function can successfully retrieve + data from InfluxDB as a list of ``InfluxDBMeasure``. + """ + time = random.randint(0, 1000) # noqa: F811 + host = "test-host" + step = random.randint(0, 1000) + task = random.randint(0, 1000) + job = random.randint(0, 1000) + measurement_value = random.uniform(1, 1000) + measurement = random.choice(get_args(INFLUXDB_MEASUREMENT)) + + mocked_influxdb_client.query.return_value.get_points.return_value = [ + dict( + time=time, + host=host, + job=job, + step=step, + task=task, + value=measurement_value, + ) + ] + + result = await fetch_influx_data( + time=time, + host=host, + step=step, + task=task, + job=job, + measurement=measurement, + ) + + assert len(result) == 1 + assert result[0]["time"] == time + assert result[0]["host"] == host + assert result[0]["job"] == job + assert result[0]["step"] == step + assert result[0]["task"] == task + assert result[0]["value"] == measurement_value + assert result[0]["measurement"] == measurement + + +@pytest.mark.asyncio +@mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT") +async def test_fetch_influx_data__raises_JobbergateApiError_if_query_fails(mocked_influxdb_client: mock.MagicMock): + """ + Test that the ``fetch_influx_data()`` function will raise a JobbergateApiError + if the query to InfluxDB fails. + """ + measurement = random.choice(get_args(INFLUXDB_MEASUREMENT)) + + mocked_influxdb_client.query = mock.Mock(side_effect=Exception("BOOM!")) + + time = random.randint(0, 1000) # noqa: F811 + host = "test-host" + step = random.randint(0, 1000) + task = random.randint(0, 1000) + job = random.randint(0, 1000) + + query = f""" + SELECT * FROM {measurement} WHERE time > $time AND host = $host AND step = $step AND task = $task AND job = $job + """ + params = dict(time=time, host=host, step=str(step), task=str(task), job=str(job)) + + with pytest.raises(JobbergateApiError, match="Failed to fetch data from InfluxDB"): + await fetch_influx_data( + time=time, + host=host, + step=step, + task=task, + job=job, + measurement=measurement, + ) + + mocked_influxdb_client.query.assert_called_once_with(query, bind_params=params, epoch="us") + + +@pytest.mark.asyncio +async def test_fetch_influx_data__raises_JobbergateApiError_if_influxdb_client_is_None(): + """ + Test that the ``fetch_influx_data()`` function will raise a JobbergateApiError + if the INFLUXDB_CLIENT is None. + """ + measurement = random.choice(get_args(INFLUXDB_MEASUREMENT)) + with mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT", None): + with pytest.raises(JobbergateApiError, match="Failed to fetch data from InfluxDB"): + await fetch_influx_data( + time=random.randint(0, 1000), + host="test-host", + step=random.randint(0, 1000), + task=random.randint(0, 1000), + job=random.randint(0, 1000), + measurement=measurement, + ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "measurements", + [ + [{"name": "measurement1"}, {"name": "measurement2"}], + [{"name": "measurement1"}], + [], + ], +) +@mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT") +async def test_fetch_influx_measurements__success( + mocked_influxdb_client: mock.MagicMock, measurements: list[dict[str, str]] +): + """ + Test that the ``fetch_influx_measurements()`` function can successfully retrieve + measurements from InfluxDB. + """ + mocked_influxdb_client.get_list_measurements.return_value = measurements + + result = fetch_influx_measurements() + + assert result == measurements + + +@pytest.mark.asyncio +async def test_fetch_influx_measurements__raises_JobbergateApiError_if_influxdb_client_is_None(): + """ + Test that the ``fetch_influx_measurements()`` function will raise a JobbergateApiError + if the INFLUXDB_CLIENT is None. + """ + with mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT", None): + with pytest.raises(JobbergateApiError, match="Failed to fetch measurements from InfluxDB"): + fetch_influx_measurements() + + +@pytest.mark.asyncio +@mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT") +async def test_fetch_influx_measurements__raises_JobbergateApiError_if_query_fails( + mocked_influxdb_client: mock.MagicMock, +): + """ + Test that the ``fetch_influx_measurements()`` function will raise a JobbergateApiError + if the query to InfluxDB fails. + """ + mocked_influxdb_client.get_list_measurements.side_effect = Exception("BOOM!") + + with pytest.raises(JobbergateApiError, match="Failed to fetch measurements from InfluxDB"): + fetch_influx_measurements() + + mocked_influxdb_client.get_list_measurements.assert_called_once_with() + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "num_points_per_measurement, num_hosts, num_jobs, num_steps, num_tasks", + [ + (3, 5, 2, 7, 4), + (1, 1, 1, 1, 1), + (7, 3, 10, 4, 2), + ], +) +async def test_aggregate_influx_measures__success( + num_points_per_measurement: int, + num_hosts: int, + num_jobs: int, + num_steps: int, + num_tasks: int, + generate_job_metrics_data: Callable[[int, int, int, int, int], Generator[InfluxDBMeasure]], + aggregate_job_metrics_data: Callable[ + [Iterator[InfluxDBMeasure]], + list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + ], +): + """ + Test that the ``aggregate_influx_measures()`` function can successfully aggregate + a list of InfluxDBMeasure data points. + """ + data_points = list( + generate_job_metrics_data( + num_points_per_measurement, + num_hosts, + num_jobs, + num_steps, + num_tasks, + ) + ) + + start_time = time.monotonic() + result = aggregate_influx_measures(iter(data_points)) + end_time = time.monotonic() + + expected_result = aggregate_job_metrics_data(iter(data_points)) + + assert result == expected_result + + print(f"Aggregated {len(data_points)} data points in {end_time - start_time:.5f} seconds") + + +@pytest.mark.asyncio +async def test_aggregate_influx_measures__empty_data_points(): + """ + Test that the ``aggregate_influx_measures()`` function returns an empty list + when given an empty iterator of data points. + """ + data_points = [] + + result = aggregate_influx_measures(iter(data_points)) + + assert result == [] + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("mock_access_token") +@pytest.mark.parametrize( + "job_submission_id, slurm_job_id", + [ + (1, 22), + (2, 33), + (3, 11), + ], +) +async def test_update_job_metrics__error_getting_metrics_from_api(job_submission_id: int, slurm_job_id: int): + """ + Test that the ``update_job_metrics()`` function will log an error if it fails + to get the job metrics from the API. + """ + active_job_submission = ActiveJobSubmission(id=job_submission_id, slurm_job_id=slurm_job_id) + + with respx.mock: + respx.get(f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/metrics/{job_submission_id}").mock( + return_value=httpx.Response(status_code=400) + ) + + with pytest.raises( + JobbergateApiError, match=f"Could not update job metrics for slurm job {slurm_job_id} via the API" + ): + await update_job_metrics(active_job_submission) + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("mock_access_token") +@pytest.mark.parametrize( + "job_submission_id, slurm_job_id, num_hosts, num_steps, num_tasks, measurements", + [ + (1, 22, 5, 2, 7, [{"name": "measurement1"}, {"name": "measurement2"}]), + (2, 33, 1, 1, 1, [{"name": "measurement1"}]), + (3, 11, 3, 10, 4, [{"name": "measurement1"}, {"name": "measurement2"}, {"name": "measurement3"}]), + ], +) +@mock.patch("jobbergate_agent.jobbergate.update.fetch_influx_measurements") +@mock.patch("jobbergate_agent.jobbergate.update.fetch_influx_data") +@mock.patch("jobbergate_agent.jobbergate.update.aggregate_influx_measures") +@mock.patch("jobbergate_agent.jobbergate.update.msgpack") +@mock.patch("jobbergate_agent.jobbergate.update.chain") +async def test_update_job_metrics__error_sending_metrics_to_api( + mocked_chain: mock.MagicMock, + mocked_msgpack: mock.MagicMock, + mocked_aggregate_influx_measures: mock.MagicMock, + mocked_fetch_influx_data: mock.MagicMock, + mocked_fetch_influx_measurements: mock.MagicMock, + job_submission_id: int, + slurm_job_id: int, + num_hosts: int, + num_steps: int, + num_tasks: int, + measurements: list[dict[str, str]], + job_max_times_response: Callable[[int, int, int, int], dict[str, int | list[dict[str, int | str]]]], +): + """ + Test that the ``update_job_metrics()`` function will log an error if it fails + to send the job metrics to the API. + """ + active_job_submission = ActiveJobSubmission(id=job_submission_id, slurm_job_id=slurm_job_id) + job_max_times = job_max_times_response(job_submission_id, num_hosts, num_steps, num_tasks) + + dummy_data_point = { + "time": 1, + "host": "host_1", + "job": "1", + "step": "1", + "task": "1", + "value": 1.0, + "measurement": "measurement1", + } + dummy_data_points = [dummy_data_point] * len(measurements) * len(job_max_times["max_times"]) + iter_dummy_data_points = iter(dummy_data_points) + + mocked_fetch_influx_measurements.return_value = measurements + mocked_fetch_influx_data.return_value = dummy_data_points + # doesn't return the real aggregated data due to test complexity + mocked_chain.from_iterable.return_value = iter_dummy_data_points + mocked_aggregate_influx_measures.return_value = "super-dummy-aggregated-data" + mocked_msgpack.packb.return_value = b"dummy-msgpack-data" + + with respx.mock: + respx.get(f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/metrics/{job_submission_id}").mock( + return_value=httpx.Response( + status_code=200, + json=job_max_times, + ) + ) + respx.put( + f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/metrics/{job_submission_id}", + data={"slurm_job_id": slurm_job_id, "binary_data": b"dummy-msgpack-data"}, + ).mock(return_value=httpx.Response(status_code=400)) + + with pytest.raises( + JobbergateApiError, match=f"Could not update job metrics for slurm job {slurm_job_id} via the API" + ): + await update_job_metrics(active_job_submission) + + mocked_fetch_influx_measurements.assert_called_once_with() + mocked_fetch_influx_data.assert_has_calls( + [ + mock.call( + job_max_time["max_time"], + job_max_time["node_host"], + job_max_time["step"], + job_max_time["task"], + slurm_job_id, + measurement["name"], + ) + for job_max_time in job_max_times["max_times"] + for measurement in measurements + ] + ) + mocked_chain.from_iterable.assert_called_once_with( + [dummy_data_points] * len(measurements) * len(job_max_times["max_times"]) + ) + mocked_aggregate_influx_measures.assert_called_once_with(iter_dummy_data_points) + mocked_msgpack.packb.assert_called_once_with("super-dummy-aggregated-data") + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("mock_access_token") +@pytest.mark.parametrize( + "job_submission_id, slurm_job_id, num_hosts, num_steps, num_tasks, measurements", + [ + (1, 22, 5, 2, 7, [{"name": "measurement1"}, {"name": "measurement2"}]), + (2, 33, 1, 1, 1, [{"name": "measurement1"}]), + (3, 11, 3, 10, 4, [{"name": "measurement1"}, {"name": "measurement2"}, {"name": "measurement3"}]), + ], +) +@mock.patch("jobbergate_agent.jobbergate.update.fetch_influx_measurements") +@mock.patch("jobbergate_agent.jobbergate.update.fetch_influx_data") +@mock.patch("jobbergate_agent.jobbergate.update.aggregate_influx_measures") +@mock.patch("jobbergate_agent.jobbergate.update.msgpack") +@mock.patch("jobbergate_agent.jobbergate.update.chain") +async def test_update_job_metrics__success( + mocked_chain: mock.MagicMock, + mocked_msgpack: mock.MagicMock, + mocked_aggregate_influx_measures: mock.MagicMock, + mocked_fetch_influx_data: mock.MagicMock, + mocked_fetch_influx_measurements: mock.MagicMock, + job_submission_id: int, + slurm_job_id: int, + num_hosts: int, + num_steps: int, + num_tasks: int, + measurements: list[dict[str, str]], + job_max_times_response: Callable[[int, int, int, int], dict[str, int | list[dict[str, int | str]]]], +): + """ + Test that the ``update_job_metrics()`` function will log an error if it fails + to send the job metrics to the API. + """ + active_job_submission = ActiveJobSubmission(id=job_submission_id, slurm_job_id=slurm_job_id) + job_max_times = job_max_times_response(job_submission_id, num_hosts, num_steps, num_tasks) + + dummy_data_point = { + "time": 1, + "host": "host_1", + "job": "1", + "step": "1", + "task": "1", + "value": 1.0, + "measurement": "measurement1", + } + dummy_data_points = [dummy_data_point] * len(measurements) * len(job_max_times["max_times"]) + iter_dummy_data_points = iter(dummy_data_points) + + mocked_fetch_influx_measurements.return_value = measurements + mocked_fetch_influx_data.return_value = dummy_data_points + # doesn't return the real aggregated data due to test complexity + mocked_chain.from_iterable.return_value = iter_dummy_data_points + mocked_aggregate_influx_measures.return_value = "super-dummy-aggregated-data" + mocked_msgpack.packb.return_value = b"dummy-msgpack-data" + + with respx.mock: + respx.get(f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/metrics/{job_submission_id}").mock( + return_value=httpx.Response( + status_code=200, + json=job_max_times, + ) + ) + respx.put( + f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/metrics/{job_submission_id}", + data={"slurm_job_id": slurm_job_id, "binary_data": b"dummy-msgpack-data"}, + ).mock(return_value=httpx.Response(status_code=200)) + + await update_job_metrics(active_job_submission) + + mocked_fetch_influx_measurements.assert_called_once_with() + mocked_fetch_influx_data.assert_has_calls( + [ + mock.call( + job_max_time["max_time"], + job_max_time["node_host"], + job_max_time["step"], + job_max_time["task"], + slurm_job_id, + measurement["name"], + ) + for job_max_time in job_max_times["max_times"] + for measurement in measurements + ] + ) + mocked_chain.from_iterable.assert_called_once_with( + [dummy_data_points] * len(measurements) * len(job_max_times["max_times"]) + ) + mocked_aggregate_influx_measures.assert_called_once_with(iter_dummy_data_points) + mocked_msgpack.packb.assert_called_once_with("super-dummy-aggregated-data") diff --git a/jobbergate-agent/tests/test_influxdb_client.py b/jobbergate-agent/tests/test_influxdb_client.py new file mode 100644 index 000000000..8d4b7c7b1 --- /dev/null +++ b/jobbergate-agent/tests/test_influxdb_client.py @@ -0,0 +1,70 @@ +"""Core module for testing the initialization of the InfluxDB client.""" + +import contextlib +from collections.abc import Callable +from unittest import mock + +import pytest + +from jobbergate_agent.clients.influx import initialize_influx_client + + +@mock.patch("jobbergate_agent.clients.influx.InfluxDBClient") +def test_client_is_None_if_integration_is_disabled( + mocked_InfluxDBClient: mock.MagicMock, tweak_settings: Callable[..., contextlib._GeneratorContextManager] +): + """Test that the client is None if the Influx integration is disabled.""" + with tweak_settings(INFLUX_INTEGRATION_ENABLED=False): + client = initialize_influx_client() + mocked_InfluxDBClient.assert_not_called() + mocked_InfluxDBClient.from_dsn.assert_not_called() + assert client is None + + +@pytest.mark.parametrize( + "pool_size, ssl, verify_ssl, timeout, udp_port, cert", + [ + (10, True, True, 10, 8089, "/path/to/cert"), + (20, False, False, 20, 8090, "/path/to/another/cert"), + (30, True, False, 30, 8091, "/maybe/another/cert"), + ], +) +@mock.patch("jobbergate_agent.clients.influx.InfluxDBClient") +def test_client_is_initialized( + mocked_InfluxDBClient: mock.MagicMock, + pool_size: int, + ssl: bool, + verify_ssl: bool, + timeout: int, + udp_port: int, + cert: str, + tweak_settings: Callable[..., contextlib._GeneratorContextManager], +): + """Test that the influxdb_client is properly initialized by the function ``initialize_influx_client ``.""" + mocked_InfluxDBClient.from_dsn = mock.Mock(return_value="dummy-value") + + influx_dsn = "influxdb://user:passwd@localhost:8086/database" + + with tweak_settings( + INFLUX_DSN=influx_dsn, + INFLUX_POOL_SIZE=pool_size, + INFLUX_SSL=ssl, + INFLUX_VERIFY_SSL=verify_ssl, + INFLUX_TIMEOUT=timeout, + INFLUX_UDP_PORT=udp_port, + INFLUX_CERT_PATH=cert, + INFLUX_INTEGRATION_ENABLED=True, + ): + client = initialize_influx_client() + + assert client == "dummy-value" + mocked_InfluxDBClient.from_dsn.assert_called_once_with( + influx_dsn, + pool_size=pool_size, + ssl=ssl, + verify_ssl=verify_ssl, + timeout=timeout, + udp_port=udp_port, + cert=cert, + ) + mocked_InfluxDBClient.assert_not_called() diff --git a/jobbergate-agent/tests/test_settings.py b/jobbergate-agent/tests/test_settings.py index e69de29bb..7f629cbc9 100644 --- a/jobbergate-agent/tests/test_settings.py +++ b/jobbergate-agent/tests/test_settings.py @@ -0,0 +1,36 @@ +"""Core module for testing the settings module.""" + +import pytest + +from jobbergate_agent.settings import Settings + + +def test_settings__manually_set_influx_integration_flag(): + """Test that the InfluxDB integration flag cannot be manually set.""" + with pytest.raises(ValueError): + Settings(INFLUX_DSN=None, INFLUX_INTEGRATION_ENABLED=True) + + +@pytest.mark.parametrize( + "influx_dsn, valid_scheme", + [ + ("http://localhost:8086", False), + ("http+influxdb://localhost:8086", False), + ("ftp://localhost:8086", False), + ("smtp://localhost:8086", False), + ("file://localhost:8086", False), + ], +) +def test_settings__check_invalid_influx_dsn_scheme(influx_dsn: str, valid_scheme: bool): + """Test if a few invalid DSN schemes raise ValueError.""" + if valid_scheme: + Settings(INFLUX_DSN=influx_dsn) + else: + with pytest.raises(ValueError): + Settings(INFLUX_DSN=influx_dsn) + + +def test_settings__check_influx_ssl_cert_path(): + """Test that the SSL certificate path is required when SSL is enabled.""" + with pytest.raises(ValueError): + Settings(INFLUX_DSN="https+influxdb://localhost:8086", INFLUX_SSL=True, INFLUX_CERT_PATH=None) From 71d3ffd4e2429ff3b3b0515b08464ca70465c02d Mon Sep 17 00:00:00 2001 From: Matheus Tosta Date: Wed, 27 Nov 2024 16:28:42 -0400 Subject: [PATCH 02/13] PENG-2457 add config options for the configure hook in the jobbergate agent snap --- jobbergate-agent-snap/hooks/bin/configure | 7 +++++++ jobbergate-agent-snap/snap/snapcraft.yaml | 16 +++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/jobbergate-agent-snap/hooks/bin/configure b/jobbergate-agent-snap/hooks/bin/configure index 3fa51de7b..551fb29ad 100755 --- a/jobbergate-agent-snap/hooks/bin/configure +++ b/jobbergate-agent-snap/hooks/bin/configure @@ -25,6 +25,13 @@ AGENT_VARIABLES_MAP: dict[str, Union[str, int]] = { "SLURM_USER_MAPPER": "", "SINGLE_USER_SUBMITTER": "ubuntu", "WRITE_SUBMISSION_FILES": "true", + "INFLUX_DSN": "", + "INFLUX_POOL_SIZE": 10, + "INFLUX_SSL": "false", + "INFLUX_VERIFY_SSL": "false", + "INFLUX_TIMEOUT": "", + "INFLUX_UDP_PORT": 4444, + "INFLUX_CERT_PATH": "", } diff --git a/jobbergate-agent-snap/snap/snapcraft.yaml b/jobbergate-agent-snap/snap/snapcraft.yaml index 6941d3c03..f8f1bda49 100644 --- a/jobbergate-agent-snap/snap/snapcraft.yaml +++ b/jobbergate-agent-snap/snap/snapcraft.yaml @@ -1,6 +1,6 @@ name: jobbergate-agent base: core22 -version: '0.4.0' +version: '0.5.0' summary: The Jobbergate Agent snap adopt-info: metadata license: MIT @@ -33,6 +33,20 @@ description: | - write-submission-files: A boolean value (true, false) that indicates whether the agent should write submission files to disk. This is optional and defaults to false. + - influx-dsn: The DSN of the InfluxDB server that the agent will use to fetch job metrics data. It only allows the following schemes: 'influxdb', 'https+influxdb' and 'udp+influxdb'. This is optional and defaults to none. + + - influx-pool-size: The size of the InfluxDB connection pool that the agent will use to fetch job metrics data. This is optional and defaults to 10. + + - influx-ssl: A boolean value (true, false) that indicates whether the agent should use SSL to connect to the InfluxDB server. If true, `influx-cert-path` must be provided. This is optional and defaults to false. + + - influx-verify-ssl: A boolean value (true, false) that indicates whether the agent should verify the SSL certificate of the InfluxDB server. This is optional and defaults to false. + + - influx-timeout: The timeout in seconds that the agent will use when connecting to the InfluxDB server. This is optional and defaults to none. + + - influx-udp-port: The UDP port that the agent will use to connect to the InfluxDB server. This is optional and defaults to 4444. + + - influx-cert-path: The absolute path to the SSL certificate that the agent will use to connect to the InfluxDB server. This is optional and defaults to none. + For learning more about Jobbergate and how it can be used on Vantage, please visit https://docs.vantagehpc.io grade: stable From 9bd83ad5051fba435551526285a778296e1fd7f8 Mon Sep 17 00:00:00 2001 From: matheushent Date: Tue, 3 Dec 2024 19:32:21 +0000 Subject: [PATCH 03/13] PENG-2457 edit the request for sending the metrics to the API --- jobbergate-agent/jobbergate_agent/jobbergate/update.py | 7 +------ jobbergate-agent/tests/jobbergate/test_update.py | 4 ++-- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/update.py b/jobbergate-agent/jobbergate_agent/jobbergate/update.py index 49c69f115..68effddd3 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/update.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/update.py @@ -197,13 +197,8 @@ async def update_job_metrics(active_job_submittion: ActiveJobSubmission) -> None aggregated_data_points = aggregate_influx_measures(data_points) packed_data = msgpack.packb(aggregated_data_points) - request_payload = { - "slurm_job_id": active_job_submittion.slurm_job_id, - "binary_data": packed_data, - } response = await jobbergate_api_client.put( - f"jobbergate/job-submissions/agent/metrics/{active_job_submittion.id}", - data=request_payload, + f"jobbergate/job-submissions/agent/metrics/{active_job_submittion.id}", content=packed_data ) response.raise_for_status() diff --git a/jobbergate-agent/tests/jobbergate/test_update.py b/jobbergate-agent/tests/jobbergate/test_update.py index 4d4c5cf9b..931baf437 100644 --- a/jobbergate-agent/tests/jobbergate/test_update.py +++ b/jobbergate-agent/tests/jobbergate/test_update.py @@ -701,7 +701,7 @@ async def test_update_job_metrics__error_sending_metrics_to_api( ) respx.put( f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/metrics/{job_submission_id}", - data={"slurm_job_id": slurm_job_id, "binary_data": b"dummy-msgpack-data"}, + content=b"dummy-msgpack-data", ).mock(return_value=httpx.Response(status_code=400)) with pytest.raises( @@ -795,7 +795,7 @@ async def test_update_job_metrics__success( ) respx.put( f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/metrics/{job_submission_id}", - data={"slurm_job_id": slurm_job_id, "binary_data": b"dummy-msgpack-data"}, + content=b"dummy-msgpack-data", ).mock(return_value=httpx.Response(status_code=200)) await update_job_metrics(active_job_submission) From 651da55d7bfe5e062ce248cd0e62dac146f217ac Mon Sep 17 00:00:00 2001 From: matheushent Date: Tue, 3 Dec 2024 19:42:41 +0000 Subject: [PATCH 04/13] PENG-2457 remove the INFLUX_INTEGRATION_ENABLED settings and use a property instead --- .../jobbergate_agent/clients/influx.py | 2 +- .../jobbergate_agent/jobbergate/update.py | 2 +- jobbergate-agent/jobbergate_agent/settings.py | 25 ++++++------------- .../tests/jobbergate/test_update.py | 2 +- .../tests/test_influxdb_client.py | 3 +-- jobbergate-agent/tests/test_settings.py | 6 ++--- 6 files changed, 14 insertions(+), 26 deletions(-) diff --git a/jobbergate-agent/jobbergate_agent/clients/influx.py b/jobbergate-agent/jobbergate_agent/clients/influx.py index c3eb1411a..4465ff326 100644 --- a/jobbergate-agent/jobbergate_agent/clients/influx.py +++ b/jobbergate-agent/jobbergate_agent/clients/influx.py @@ -8,7 +8,7 @@ def initialize_influx_client() -> None | InfluxDBClient: """Initialize the InfluxDB client.""" - if SETTINGS.INFLUX_INTEGRATION_ENABLED: + if SETTINGS.influx_integration_enabled: logger.debug("InfluxDB integration is enabled. Initializing InfluxDB client...") return InfluxDBClient.from_dsn( str(SETTINGS.INFLUX_DSN), diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/update.py b/jobbergate-agent/jobbergate_agent/jobbergate/update.py index 68effddd3..2f8443f25 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/update.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/update.py @@ -216,7 +216,7 @@ async def update_active_jobs() -> None: skip = "skipping to next active job" for active_job_submission in active_job_submissions: - if SETTINGS.INFLUX_INTEGRATION_ENABLED: + if SETTINGS.influx_integration_enabled: logger.debug(f"Updating job metrics for job_submission {active_job_submission.id}") try: await update_job_metrics(active_job_submission) diff --git a/jobbergate-agent/jobbergate_agent/settings.py b/jobbergate-agent/jobbergate_agent/settings.py index 5eb72f256..0c9540e4c 100644 --- a/jobbergate-agent/jobbergate_agent/settings.py +++ b/jobbergate-agent/jobbergate_agent/settings.py @@ -3,7 +3,7 @@ from typing import Optional import buzz -from pydantic import AnyHttpUrl, Field, ValidationError, model_validator, field_validator, AnyUrl +from pydantic import AnyHttpUrl, Field, ValidationError, model_validator, AnyUrl from pydantic_settings import BaseSettings, SettingsConfigDict from typing_extensions import Self @@ -70,10 +70,10 @@ class Settings(BaseSettings): INFLUX_TIMEOUT: Optional[int] = Field(None, ge=1, description="Timeout for InfluxDB connection") INFLUX_UDP_PORT: int = Field(4444, ge=1, le=65535, description="UDP port for InfluxDB connection") INFLUX_CERT_PATH: Optional[Path] = Field(None, description="Path to InfluxDB certificate file") - INFLUX_INTEGRATION_ENABLED: bool = Field( - False, - description="Control parameter for indicating if InfluxDB integration is enabled. It shouldn't be manually configured.", - ) + + @property + def influx_integration_enabled(self) -> bool: + return self.INFLUX_DSN is not None @model_validator(mode="after") def compute_extra_settings(self) -> Self: @@ -98,9 +98,7 @@ def compute_extra_settings(self) -> Self: @model_validator(mode="after") def validate_influxdb_settings(self) -> Self: - if self.INFLUX_DSN is not None: - self.INFLUX_INTEGRATION_ENABLED = True - + if self.influx_integration_enabled: if self.INFLUX_SSL: buzz.require_condition( self.INFLUX_CERT_PATH is not None, @@ -108,20 +106,11 @@ def validate_influxdb_settings(self) -> Self: ValueError, ) + assert self.INFLUX_DSN is not None if self.INFLUX_DSN.scheme not in ["influxdb", "https+influxdb", "udp+influxdb"]: raise ValueError("INFLUX_DSN scheme must be one of 'influxdb', 'https+influxdb' or 'udp+influxdb'") return self - @field_validator("INFLUX_INTEGRATION_ENABLED", mode="before") - @classmethod - def validate_influxdb_integration_enabled_value(cls, value: bool): - buzz.require_condition( - not value, - "The INFLUX_INTEGRATION_ENABLED configuration should not be manually configured", - ValueError, - ) - return value - model_config = SettingsConfigDict(env_prefix="JOBBERGATE_AGENT_", env_file=_get_env_file(), extra="ignore") diff --git a/jobbergate-agent/tests/jobbergate/test_update.py b/jobbergate-agent/tests/jobbergate/test_update.py index 931baf437..d238aa263 100644 --- a/jobbergate-agent/tests/jobbergate/test_update.py +++ b/jobbergate-agent/tests/jobbergate/test_update.py @@ -361,7 +361,7 @@ def _mocked_update_job_data(job_submission_id, slurm_job_data): "jobbergate_agent.jobbergate.update.update_job_data", side_effect=_mocked_update_job_data ) - with tweak_settings(INFLUX_INTEGRATION_ENABLED=True): + with tweak_settings(INFLUX_DSN="https://influxdb:8086"): await update_active_jobs() mock_update_job_metrics.assert_has_calls( diff --git a/jobbergate-agent/tests/test_influxdb_client.py b/jobbergate-agent/tests/test_influxdb_client.py index 8d4b7c7b1..5874d9e5b 100644 --- a/jobbergate-agent/tests/test_influxdb_client.py +++ b/jobbergate-agent/tests/test_influxdb_client.py @@ -14,7 +14,7 @@ def test_client_is_None_if_integration_is_disabled( mocked_InfluxDBClient: mock.MagicMock, tweak_settings: Callable[..., contextlib._GeneratorContextManager] ): """Test that the client is None if the Influx integration is disabled.""" - with tweak_settings(INFLUX_INTEGRATION_ENABLED=False): + with tweak_settings(INFLUX_DSN=None): client = initialize_influx_client() mocked_InfluxDBClient.assert_not_called() mocked_InfluxDBClient.from_dsn.assert_not_called() @@ -53,7 +53,6 @@ def test_client_is_initialized( INFLUX_TIMEOUT=timeout, INFLUX_UDP_PORT=udp_port, INFLUX_CERT_PATH=cert, - INFLUX_INTEGRATION_ENABLED=True, ): client = initialize_influx_client() diff --git a/jobbergate-agent/tests/test_settings.py b/jobbergate-agent/tests/test_settings.py index 7f629cbc9..cdc989ece 100644 --- a/jobbergate-agent/tests/test_settings.py +++ b/jobbergate-agent/tests/test_settings.py @@ -5,10 +5,10 @@ from jobbergate_agent.settings import Settings -def test_settings__manually_set_influx_integration_flag(): +def test_settings__check_influx_integration_flag(): """Test that the InfluxDB integration flag cannot be manually set.""" - with pytest.raises(ValueError): - Settings(INFLUX_DSN=None, INFLUX_INTEGRATION_ENABLED=True) + settings = Settings(INFLUX_DSN=None) + assert settings.influx_integration_enabled is False @pytest.mark.parametrize( From 344042a07950c57cdf9335bc08e454f166bcab99 Mon Sep 17 00:00:00 2001 From: matheushent Date: Thu, 5 Dec 2024 14:38:33 +0000 Subject: [PATCH 05/13] PENG-2457 refactor the *aggregate_influx_measures* to use numpy and numba instead of vanilla Python. As well as, create a module specific for compute related operations. --- .../jobbergate_agent/jobbergate/update.py | 44 +-- .../jobbergate_agent/utils/compute.py | 126 +++++++ jobbergate-agent/poetry.lock | 122 ++++++- jobbergate-agent/pyproject.toml | 3 +- .../tests/jobbergate/test_compute.py | 307 ++++++++++++++++++ .../tests/jobbergate/test_update.py | 133 +------- 6 files changed, 560 insertions(+), 175 deletions(-) create mode 100644 jobbergate-agent/jobbergate_agent/utils/compute.py create mode 100644 jobbergate-agent/tests/jobbergate/test_compute.py diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/update.py b/jobbergate-agent/jobbergate_agent/jobbergate/update.py index 2f8443f25..9203bf9f8 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/update.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/update.py @@ -1,8 +1,7 @@ import asyncio import json from itertools import chain -from typing import List, get_args, cast -from collections.abc import Iterator +from typing import List import msgpack from jobbergate_core.tools.sbatch import InfoHandler @@ -21,6 +20,7 @@ from jobbergate_agent.utils.exception import JobbergateApiError, SbatchError from jobbergate_agent.utils.logging import log_error from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT +from jobbergate_agent.utils.compute import aggregate_influx_measures async def fetch_job_data(slurm_job_id: int, info_handler: InfoHandler) -> SlurmJobData: @@ -123,46 +123,6 @@ def fetch_influx_measurements() -> list[InfluxDBMeasurement]: return measurements -def aggregate_influx_measures( - data_points: Iterator[InfluxDBMeasure], -) -> list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]]: - """Aggregate the list of data points by time, host, step and task. - - The output data is a list of tuples with the following format: - [ - (time, host, step, task, CPUFrequency, CPUTime, CPUUtilization, GPUMemMB, - GPUUtilization, Pages, RSS, VMSize, ReadMB, WriteMB), - ... - ] - """ - measurement_names = get_args(INFLUXDB_MEASUREMENT) - default_measurements: dict[str, float] = {measurement: 0.0 for measurement in measurement_names} - - aggregated_data: dict[tuple[int, str, str, str], dict[str, float]] = {} - - for measure in data_points: - key = (measure["time"], measure["host"], measure["step"], measure["task"]) - - # aggregate measurements lazily to avoid creating a new dict for each point - if key not in aggregated_data: - aggregated_data[key] = default_measurements.copy() - aggregated_data[key][measure["measurement"]] = measure["value"] - - return cast( - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], - [ - ( - time, - host, - step, - task, - *(aggregated_data[(time, host, step, task)][measurement] for measurement in measurement_names), - ) - for (time, host, step, task) in aggregated_data - ], - ) - - async def update_job_metrics(active_job_submittion: ActiveJobSubmission) -> None: """Update job metrics for a job submission. diff --git a/jobbergate-agent/jobbergate_agent/utils/compute.py b/jobbergate-agent/jobbergate_agent/utils/compute.py new file mode 100644 index 000000000..0ca13677f --- /dev/null +++ b/jobbergate-agent/jobbergate_agent/utils/compute.py @@ -0,0 +1,126 @@ +"""Core module for compute related functions.""" + +import tracemalloc +from collections.abc import Callable +from functools import wraps +from typing import Any, get_args, cast +from collections.abc import Iterator + +import numpy as np +from loguru import logger +from numba import njit + +from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure + +from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT + + +def measure_memory_usage(func: Callable) -> Callable: + """Decorator to measure the memory usage of a function. + + Args: + func: Function to measure memory usage of. + + Returns: + Decorated function. + """ + + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + tracemalloc.start() + result = func(*args, **kwargs) + current, peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + + logger.debug(f"Memory usage for function '{func.__name__}': {current=}B, {peak=}B") + return result + + return wrapper + + +def _create_mapping(column): + """Create a mapping of unique strings to integers.""" + unique_values = sorted(set(column)) + return {val: idx for idx, val in enumerate(unique_values)} + + +@njit +def _aggregate_with_numba( + values: np.ndarray, key_indices: np.ndarray, measurement_indices: np.ndarray, num_keys: int, num_measurements: int +): + """ + Perform aggregation using numba. + """ + aggregated_values = np.zeros((num_keys, num_measurements), dtype=np.float64) + + for i in range(len(values)): + key_idx = key_indices[i] + measurement_idx = measurement_indices[i] + aggregated_values[key_idx, measurement_idx] += values[i] + + return aggregated_values + + +@measure_memory_usage +def aggregate_influx_measures( + data_points: Iterator[InfluxDBMeasure], +) -> list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]]: + """Aggregate the list of data points by time, host, step and task. + + The output data is a list of tuples with the following format: + [ + (time, host, step, task, CPUFrequency, CPUTime, CPUUtilization, GPUMemMB, + GPUUtilization, Pages, RSS, VMSize, ReadMB, WriteMB), + ... + ] + """ + measurement_names = get_args(INFLUXDB_MEASUREMENT) + measurement_mapping = {name: idx for idx, name in enumerate(measurement_names)} + num_measurements = len(measurement_names) + + data_points_list = list(data_points) + + # Extract columns and map strings to integers + times = np.fromiter(map(lambda d: d["time"], data_points_list), dtype=np.int64) + hosts = np.fromiter(map(lambda d: d["host"], data_points_list), dtype=np.object_) + steps = np.fromiter(map(lambda d: d["step"], data_points_list), dtype=np.object_) + tasks = np.fromiter(map(lambda d: d["task"], data_points_list), dtype=np.object_) + measurements = np.fromiter(map(lambda d: measurement_mapping[d["measurement"]], data_points_list), dtype=np.int8) + values = np.fromiter(map(lambda d: d["value"], data_points_list), dtype=np.float64) + + # Create mappings for string columns + host_mapping = _create_mapping(hosts) + step_mapping = _create_mapping(steps) + task_mapping = _create_mapping(tasks) + + # Map strings to integers + host_indices = np.array([host_mapping[h] for h in hosts], dtype=np.int64) + step_indices = np.array([step_mapping[s] for s in steps], dtype=np.int64) + task_indices = np.array([task_mapping[t] for t in tasks], dtype=np.int64) + + # Combine keys for grouping + keys = np.stack((times, host_indices, step_indices, task_indices), axis=1) + unique_keys, key_indices = np.unique(keys, axis=0, return_inverse=True) + num_keys = len(unique_keys) + + # Perform aggregation + aggregated_values = _aggregate_with_numba(values, key_indices, measurements, num_keys, num_measurements) + + # Convert results back to original format + reverse_host_mapping = {v: k for k, v in host_mapping.items()} + reverse_step_mapping = {v: k for k, v in step_mapping.items()} + reverse_task_mapping = {v: k for k, v in task_mapping.items()} + + return cast( + list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + [ + ( + int(unique_key[0]), # time + reverse_host_mapping[unique_key[1]], # host + reverse_step_mapping[unique_key[2]], # step + reverse_task_mapping[unique_key[3]], # task + *map(float, aggregated_values[i]), + ) + for i, unique_key in enumerate(unique_keys) + ], + ) diff --git a/jobbergate-agent/poetry.lock b/jobbergate-agent/poetry.lock index d34465f26..c0ffdab70 100644 --- a/jobbergate-agent/poetry.lock +++ b/jobbergate-agent/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. [[package]] name = "annotated-types" @@ -435,6 +435,36 @@ python-jose = "^3.3.0" type = "directory" url = "../jobbergate-core" +[[package]] +name = "llvmlite" +version = "0.43.0" +description = "lightweight wrapper around basic LLVM functionality" +optional = false +python-versions = ">=3.9" +files = [ + {file = "llvmlite-0.43.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:a289af9a1687c6cf463478f0fa8e8aa3b6fb813317b0d70bf1ed0759eab6f761"}, + {file = "llvmlite-0.43.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:6d4fd101f571a31acb1559ae1af30f30b1dc4b3186669f92ad780e17c81e91bc"}, + {file = "llvmlite-0.43.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7d434ec7e2ce3cc8f452d1cd9a28591745de022f931d67be688a737320dfcead"}, + {file = "llvmlite-0.43.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6912a87782acdff6eb8bf01675ed01d60ca1f2551f8176a300a886f09e836a6a"}, + {file = "llvmlite-0.43.0-cp310-cp310-win_amd64.whl", hash = "sha256:14f0e4bf2fd2d9a75a3534111e8ebeb08eda2f33e9bdd6dfa13282afacdde0ed"}, + {file = "llvmlite-0.43.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:3e8d0618cb9bfe40ac38a9633f2493d4d4e9fcc2f438d39a4e854f39cc0f5f98"}, + {file = "llvmlite-0.43.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:e0a9a1a39d4bf3517f2af9d23d479b4175ead205c592ceeb8b89af48a327ea57"}, + {file = "llvmlite-0.43.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c1da416ab53e4f7f3bc8d4eeba36d801cc1894b9fbfbf2022b29b6bad34a7df2"}, + {file = "llvmlite-0.43.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:977525a1e5f4059316b183fb4fd34fa858c9eade31f165427a3977c95e3ee749"}, + {file = "llvmlite-0.43.0-cp311-cp311-win_amd64.whl", hash = "sha256:d5bd550001d26450bd90777736c69d68c487d17bf371438f975229b2b8241a91"}, + {file = "llvmlite-0.43.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:f99b600aa7f65235a5a05d0b9a9f31150c390f31261f2a0ba678e26823ec38f7"}, + {file = "llvmlite-0.43.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:35d80d61d0cda2d767f72de99450766250560399edc309da16937b93d3b676e7"}, + {file = "llvmlite-0.43.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:eccce86bba940bae0d8d48ed925f21dbb813519169246e2ab292b5092aba121f"}, + {file = "llvmlite-0.43.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:df6509e1507ca0760787a199d19439cc887bfd82226f5af746d6977bd9f66844"}, + {file = "llvmlite-0.43.0-cp312-cp312-win_amd64.whl", hash = "sha256:7a2872ee80dcf6b5dbdc838763d26554c2a18aa833d31a2635bff16aafefb9c9"}, + {file = "llvmlite-0.43.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9cd2a7376f7b3367019b664c21f0c61766219faa3b03731113ead75107f3b66c"}, + {file = "llvmlite-0.43.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:18e9953c748b105668487b7c81a3e97b046d8abf95c4ddc0cd3c94f4e4651ae8"}, + {file = "llvmlite-0.43.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:74937acd22dc11b33946b67dca7680e6d103d6e90eeaaaf932603bec6fe7b03a"}, + {file = "llvmlite-0.43.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc9efc739cc6ed760f795806f67889923f7274276f0eb45092a1473e40d9b867"}, + {file = "llvmlite-0.43.0-cp39-cp39-win_amd64.whl", hash = "sha256:47e147cdda9037f94b399bf03bfd8a6b6b1f2f90be94a454e3386f006455a9b4"}, + {file = "llvmlite-0.43.0.tar.gz", hash = "sha256:ae2b5b5c3ef67354824fb75517c8db5fbe93bc02cd9671f3c62271626bc041d5"}, +] + [[package]] name = "loguru" version = "0.6.0" @@ -584,6 +614,94 @@ files = [ {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, ] +[[package]] +name = "numba" +version = "0.60.0" +description = "compiling Python code using LLVM" +optional = false +python-versions = ">=3.9" +files = [ + {file = "numba-0.60.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:5d761de835cd38fb400d2c26bb103a2726f548dc30368853121d66201672e651"}, + {file = "numba-0.60.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:159e618ef213fba758837f9837fb402bbe65326e60ba0633dbe6c7f274d42c1b"}, + {file = "numba-0.60.0-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:1527dc578b95c7c4ff248792ec33d097ba6bef9eda466c948b68dfc995c25781"}, + {file = "numba-0.60.0-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:fe0b28abb8d70f8160798f4de9d486143200f34458d34c4a214114e445d7124e"}, + {file = "numba-0.60.0-cp310-cp310-win_amd64.whl", hash = "sha256:19407ced081d7e2e4b8d8c36aa57b7452e0283871c296e12d798852bc7d7f198"}, + {file = "numba-0.60.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:a17b70fc9e380ee29c42717e8cc0bfaa5556c416d94f9aa96ba13acb41bdece8"}, + {file = "numba-0.60.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3fb02b344a2a80efa6f677aa5c40cd5dd452e1b35f8d1c2af0dfd9ada9978e4b"}, + {file = "numba-0.60.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:5f4fde652ea604ea3c86508a3fb31556a6157b2c76c8b51b1d45eb40c8598703"}, + {file = "numba-0.60.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:4142d7ac0210cc86432b818338a2bc368dc773a2f5cf1e32ff7c5b378bd63ee8"}, + {file = "numba-0.60.0-cp311-cp311-win_amd64.whl", hash = "sha256:cac02c041e9b5bc8cf8f2034ff6f0dbafccd1ae9590dc146b3a02a45e53af4e2"}, + {file = "numba-0.60.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:d7da4098db31182fc5ffe4bc42c6f24cd7d1cb8a14b59fd755bfee32e34b8404"}, + {file = "numba-0.60.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:38d6ea4c1f56417076ecf8fc327c831ae793282e0ff51080c5094cb726507b1c"}, + {file = "numba-0.60.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:62908d29fb6a3229c242e981ca27e32a6e606cc253fc9e8faeb0e48760de241e"}, + {file = "numba-0.60.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:0ebaa91538e996f708f1ab30ef4d3ddc344b64b5227b67a57aa74f401bb68b9d"}, + {file = "numba-0.60.0-cp312-cp312-win_amd64.whl", hash = "sha256:f75262e8fe7fa96db1dca93d53a194a38c46da28b112b8a4aca168f0df860347"}, + {file = "numba-0.60.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:01ef4cd7d83abe087d644eaa3d95831b777aa21d441a23703d649e06b8e06b74"}, + {file = "numba-0.60.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:819a3dfd4630d95fd574036f99e47212a1af41cbcb019bf8afac63ff56834449"}, + {file = "numba-0.60.0-cp39-cp39-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:0b983bd6ad82fe868493012487f34eae8bf7dd94654951404114f23c3466d34b"}, + {file = "numba-0.60.0-cp39-cp39-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:c151748cd269ddeab66334bd754817ffc0cabd9433acb0f551697e5151917d25"}, + {file = "numba-0.60.0-cp39-cp39-win_amd64.whl", hash = "sha256:3031547a015710140e8c87226b4cfe927cac199835e5bf7d4fe5cb64e814e3ab"}, + {file = "numba-0.60.0.tar.gz", hash = "sha256:5df6158e5584eece5fc83294b949fd30b9f1125df7708862205217e068aabf16"}, +] + +[package.dependencies] +llvmlite = "==0.43.*" +numpy = ">=1.22,<2.1" + +[[package]] +name = "numpy" +version = "2.0.2" +description = "Fundamental package for array computing in Python" +optional = false +python-versions = ">=3.9" +files = [ + {file = "numpy-2.0.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:51129a29dbe56f9ca83438b706e2e69a39892b5eda6cedcb6b0c9fdc9b0d3ece"}, + {file = "numpy-2.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:f15975dfec0cf2239224d80e32c3170b1d168335eaedee69da84fbe9f1f9cd04"}, + {file = "numpy-2.0.2-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:8c5713284ce4e282544c68d1c3b2c7161d38c256d2eefc93c1d683cf47683e66"}, + {file = "numpy-2.0.2-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:becfae3ddd30736fe1889a37f1f580e245ba79a5855bff5f2a29cb3ccc22dd7b"}, + {file = "numpy-2.0.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2da5960c3cf0df7eafefd806d4e612c5e19358de82cb3c343631188991566ccd"}, + {file = "numpy-2.0.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:496f71341824ed9f3d2fd36cf3ac57ae2e0165c143b55c3a035ee219413f3318"}, + {file = "numpy-2.0.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:a61ec659f68ae254e4d237816e33171497e978140353c0c2038d46e63282d0c8"}, + {file = "numpy-2.0.2-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:d731a1c6116ba289c1e9ee714b08a8ff882944d4ad631fd411106a30f083c326"}, + {file = "numpy-2.0.2-cp310-cp310-win32.whl", hash = "sha256:984d96121c9f9616cd33fbd0618b7f08e0cfc9600a7ee1d6fd9b239186d19d97"}, + {file = "numpy-2.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:c7b0be4ef08607dd04da4092faee0b86607f111d5ae68036f16cc787e250a131"}, + {file = "numpy-2.0.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:49ca4decb342d66018b01932139c0961a8f9ddc7589611158cb3c27cbcf76448"}, + {file = "numpy-2.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:11a76c372d1d37437857280aa142086476136a8c0f373b2e648ab2c8f18fb195"}, + {file = "numpy-2.0.2-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:807ec44583fd708a21d4a11d94aedf2f4f3c3719035c76a2bbe1fe8e217bdc57"}, + {file = "numpy-2.0.2-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:8cafab480740e22f8d833acefed5cc87ce276f4ece12fdaa2e8903db2f82897a"}, + {file = "numpy-2.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a15f476a45e6e5a3a79d8a14e62161d27ad897381fecfa4a09ed5322f2085669"}, + {file = "numpy-2.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13e689d772146140a252c3a28501da66dfecd77490b498b168b501835041f951"}, + {file = "numpy-2.0.2-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:9ea91dfb7c3d1c56a0e55657c0afb38cf1eeae4544c208dc465c3c9f3a7c09f9"}, + {file = "numpy-2.0.2-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:c1c9307701fec8f3f7a1e6711f9089c06e6284b3afbbcd259f7791282d660a15"}, + {file = "numpy-2.0.2-cp311-cp311-win32.whl", hash = "sha256:a392a68bd329eafac5817e5aefeb39038c48b671afd242710b451e76090e81f4"}, + {file = "numpy-2.0.2-cp311-cp311-win_amd64.whl", hash = "sha256:286cd40ce2b7d652a6f22efdfc6d1edf879440e53e76a75955bc0c826c7e64dc"}, + {file = "numpy-2.0.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:df55d490dea7934f330006d0f81e8551ba6010a5bf035a249ef61a94f21c500b"}, + {file = "numpy-2.0.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:8df823f570d9adf0978347d1f926b2a867d5608f434a7cff7f7908c6570dcf5e"}, + {file = "numpy-2.0.2-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:9a92ae5c14811e390f3767053ff54eaee3bf84576d99a2456391401323f4ec2c"}, + {file = "numpy-2.0.2-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:a842d573724391493a97a62ebbb8e731f8a5dcc5d285dfc99141ca15a3302d0c"}, + {file = "numpy-2.0.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c05e238064fc0610c840d1cf6a13bf63d7e391717d247f1bf0318172e759e692"}, + {file = "numpy-2.0.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0123ffdaa88fa4ab64835dcbde75dcdf89c453c922f18dced6e27c90d1d0ec5a"}, + {file = "numpy-2.0.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:96a55f64139912d61de9137f11bf39a55ec8faec288c75a54f93dfd39f7eb40c"}, + {file = "numpy-2.0.2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:ec9852fb39354b5a45a80bdab5ac02dd02b15f44b3804e9f00c556bf24b4bded"}, + {file = "numpy-2.0.2-cp312-cp312-win32.whl", hash = "sha256:671bec6496f83202ed2d3c8fdc486a8fc86942f2e69ff0e986140339a63bcbe5"}, + {file = "numpy-2.0.2-cp312-cp312-win_amd64.whl", hash = "sha256:cfd41e13fdc257aa5778496b8caa5e856dc4896d4ccf01841daee1d96465467a"}, + {file = "numpy-2.0.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9059e10581ce4093f735ed23f3b9d283b9d517ff46009ddd485f1747eb22653c"}, + {file = "numpy-2.0.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:423e89b23490805d2a5a96fe40ec507407b8ee786d66f7328be214f9679df6dd"}, + {file = "numpy-2.0.2-cp39-cp39-macosx_14_0_arm64.whl", hash = "sha256:2b2955fa6f11907cf7a70dab0d0755159bca87755e831e47932367fc8f2f2d0b"}, + {file = "numpy-2.0.2-cp39-cp39-macosx_14_0_x86_64.whl", hash = "sha256:97032a27bd9d8988b9a97a8c4d2c9f2c15a81f61e2f21404d7e8ef00cb5be729"}, + {file = "numpy-2.0.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1e795a8be3ddbac43274f18588329c72939870a16cae810c2b73461c40718ab1"}, + {file = "numpy-2.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f26b258c385842546006213344c50655ff1555a9338e2e5e02a0756dc3e803dd"}, + {file = "numpy-2.0.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5fec9451a7789926bcf7c2b8d187292c9f93ea30284802a0ab3f5be8ab36865d"}, + {file = "numpy-2.0.2-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:9189427407d88ff25ecf8f12469d4d39d35bee1db5d39fc5c168c6f088a6956d"}, + {file = "numpy-2.0.2-cp39-cp39-win32.whl", hash = "sha256:905d16e0c60200656500c95b6b8dca5d109e23cb24abc701d41c02d74c6b3afa"}, + {file = "numpy-2.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:a3f4ab0caa7f053f6797fcd4e1e25caee367db3112ef2b6ef82d749530768c73"}, + {file = "numpy-2.0.2-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:7f0a0c6f12e07fa94133c8a67404322845220c06a9e80e85999afe727f7438b8"}, + {file = "numpy-2.0.2-pp39-pypy39_pp73-macosx_14_0_x86_64.whl", hash = "sha256:312950fdd060354350ed123c0e25a71327d3711584beaef30cdaa93320c392d4"}, + {file = "numpy-2.0.2-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:26df23238872200f63518dd2aa984cfca675d82469535dc7162dc2ee52d9dd5c"}, + {file = "numpy-2.0.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:a46288ec55ebbd58947d31d72be2c63cbf839f0a63b49cb755022310792a3385"}, + {file = "numpy-2.0.2.tar.gz", hash = "sha256:883c987dee1880e2a864ab0dc9892292582510604156762362d9326444636e78"}, +] + [[package]] name = "packaging" version = "24.1" @@ -1284,4 +1402,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "d1ddbb21b3fb9146a0207b9b66fe575dd90562ab2980d2d7caa7b3099d445909" +content-hash = "587f950604c9798f0dade7eab6fc45b69f24347fa6b7d835e75b631f0dbe836f" diff --git a/jobbergate-agent/pyproject.toml b/jobbergate-agent/pyproject.toml index fe34a3490..5d9e3d8a0 100644 --- a/jobbergate-agent/pyproject.toml +++ b/jobbergate-agent/pyproject.toml @@ -36,6 +36,7 @@ auto-name-enum = "^2.0.0" pydantic-settings = "^2.3.3" msgpack = "^1.1.0" influxdb = "^5.3.2" +numba = "^0.60.0" [tool.stickywheel] # This will resolve the relative path to the jobbergate-core package at build time @@ -97,7 +98,7 @@ show_missing = true plugins = "pydantic.mypy" [[tool.mypy.overrides]] -module = ["dataclasses", "toml", "requests", "jose.*", "apscheduler.*", "influxdb", "msgpack"] +module = ["dataclasses", "toml", "requests", "jose.*", "apscheduler.*", "influxdb", "msgpack", "numba"] ignore_missing_imports = true [build-system] diff --git a/jobbergate-agent/tests/jobbergate/test_compute.py b/jobbergate-agent/tests/jobbergate/test_compute.py new file mode 100644 index 000000000..69984b3c0 --- /dev/null +++ b/jobbergate-agent/tests/jobbergate/test_compute.py @@ -0,0 +1,307 @@ +"""Define tests for the functions in jobbergate_agent/utils/compute.py.""" + +import pytest +import random +import time +from collections.abc import Callable +from datetime import datetime +from typing import cast, get_args +from unittest import mock + +import numpy as np + +from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT +from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure +from jobbergate_agent.utils.compute import ( + aggregate_influx_measures, + measure_memory_usage, + _create_mapping, + _aggregate_with_numba, +) + + +@pytest.fixture() +def generate_and_aggregate_job_metrics_data() -> ( + Callable[ + [int, int, int, int, int], + tuple[ + list[InfluxDBMeasure], + list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + ], + ] +): + """ + Generates sample InfluxDB data and its aggregated counterpart. + + Returns a function that creates both the list of measures and their aggregated version. + """ + + def _generate_and_aggregate( + num_points_per_measurement: int, num_hosts: int, num_jobs: int, num_steps: int, num_tasks: int + ) -> tuple[ + list[InfluxDBMeasure], + list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + ]: + # Initialize data structures + current_time = int(datetime.now().timestamp()) + measurement_names = get_args(INFLUXDB_MEASUREMENT) + default_measurements: dict[str, float] = {measurement: 0.0 for measurement in measurement_names} + + measures = [] + aggregated_data: dict[tuple[int, str, str, str], dict[str, float]] = {} + + # Generate measures + for _ in range(num_points_per_measurement): + for host in range(1, num_hosts + 1): + for job in range(1, num_jobs + 1): + for step in range(1, num_steps + 1): + for task in range(1, num_tasks + 1): + key = (current_time, f"host_{host}", str(step), str(task)) + + if key not in aggregated_data: + aggregated_data[key] = default_measurements.copy() + + for measurement in measurement_names: + value = random.random() * 100 + measure = InfluxDBMeasure( + **{ + "time": current_time, + "host": f"host_{host}", + "job": str(job), + "step": str(step), + "task": str(task), + "value": value, + "measurement": measurement, + } + ) + measures.append(measure) + + # Aggregate value + aggregated_data[key][measurement] = value + current_time += 10 + + # Create aggregated list + aggregated_list = cast( + list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + [ + ( + time, + host, + step, + task, + *(aggregated_data[(time, host, step, task)][measurement] for measurement in measurement_names), + ) + for (time, host, step, task) in aggregated_data + ], + ) + + return measures, aggregated_list + + return _generate_and_aggregate + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "num_points_per_measurement, num_hosts, num_jobs, num_steps, num_tasks", + [ + (1, 1, 1, 1, 1), + (3, 10, 1, 5, 10), + (7, 3, 1, 4, 2), + ], +) +async def test_aggregate_influx_measures__success( + num_points_per_measurement: int, + num_hosts: int, + num_jobs: int, + num_steps: int, + num_tasks: int, + generate_and_aggregate_job_metrics_data: Callable[ + [int, int, int, int, int], + tuple[ + list[InfluxDBMeasure], + list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + ], + ], +): + """ + Test that the ``aggregate_influx_measures()`` function can successfully aggregate + a list of InfluxDBMeasure data points. + """ + measures, expected_aggregated_data = generate_and_aggregate_job_metrics_data( + num_points_per_measurement, num_hosts, num_jobs, num_steps, num_tasks + ) + + start_time = time.monotonic() + aggregated_data = aggregate_influx_measures(iter(measures)) + end_time = time.monotonic() + + print(f"Aggregated {len(measures)} data points in {end_time - start_time:.5f} seconds") + + for data_point in aggregated_data: + assert data_point in expected_aggregated_data + + +@pytest.mark.asyncio +async def test_aggregate_influx_measures__empty_data_points(): + """ + Test that the ``aggregate_influx_measures()`` function returns an empty list + when given an empty iterator of data points. + """ + data_points = [] + + result = aggregate_influx_measures(iter(data_points)) + + assert result == [] + + +@pytest.mark.parametrize( + "current, peak", + [(0, 0), (100, 200), (87, 100), (34, 43), (0, 98654), (3245879, 0)], +) +@mock.patch("jobbergate_agent.utils.compute.logger") +@mock.patch("jobbergate_agent.utils.compute.tracemalloc") +def test_measure_memory_usage_decorator( + mocked_tracemalloc: mock.MagicMock, mocked_logger: mock.MagicMock, current: int, peak: int +): + """Test the measure_memory_usage decorator.""" + + mocked_tracemalloc.get_traced_memory.return_value = (current, peak) + + @measure_memory_usage + def dummy_function(): + return sum([i for i in range(10000)]) + + result = dummy_function() + + assert result == sum(range(10000)) + mocked_logger.debug.assert_called_once_with( + f"Memory usage for function '{dummy_function.__name__}': {current=}B, {peak=}B" + ) + mocked_tracemalloc.start.assert_called_once_with() + mocked_tracemalloc.get_traced_memory.assert_called_once_with() + mocked_tracemalloc.stop.assert_called_once_with() + + +@pytest.mark.parametrize( + "current, peak", + [(0, 0), (100, 200), (87, 100), (34, 43), (0, 98654), (3245879, 0)], +) +@mock.patch("jobbergate_agent.utils.compute.logger") +@mock.patch("jobbergate_agent.utils.compute.tracemalloc") +def test_measure_memory_usage_decorator_with_args( + mocked_tracemalloc: mock.MagicMock, mocked_logger: mock.MagicMock, current: int, peak: int +): + """Test the measure_memory_usage decorator with arguments.""" + mocked_tracemalloc.get_traced_memory.return_value = (current, peak) + + @measure_memory_usage + def dummy_function_with_args(a, b): + return a + b + + result = dummy_function_with_args(5, 10) + + assert result == 15 + mocked_logger.debug.assert_called_once_with( + f"Memory usage for function '{dummy_function_with_args.__name__}': {current=}B, {peak=}B" + ) + mocked_tracemalloc.start.assert_called_once_with() + mocked_tracemalloc.get_traced_memory.assert_called_once_with() + mocked_tracemalloc.stop.assert_called_once_with() + + +def test_measure_memory_usage_decorator_logging(caplog): + """Test the measure_memory_usage decorator logging.""" + + @measure_memory_usage + def dummy_function(): + return sum([i for i in range(10000)]) + + with caplog.at_level("DEBUG"): + dummy_function() + + assert any("Memory usage for function 'dummy_function'" in message for message in caplog.messages) + + +def test_create_mapping(): + """Test the _create_mapping function.""" + column = ["apple", "banana", "apple", "orange", "banana", "apple"] + expected_mapping = {"apple": 0, "banana": 1, "orange": 2} + + result = _create_mapping(column) + + assert result == expected_mapping + + +def test_create_mapping_empty(): + """Test the _create_mapping function with an empty list.""" + column = [] + expected_mapping = {} + + result = _create_mapping(column) + + assert result == expected_mapping + + +def test_create_mapping_single_value(): + """Test the _create_mapping function with a single value list.""" + column = ["apple"] + expected_mapping = {"apple": 0} + + result = _create_mapping(column) + + assert result == expected_mapping + + +def test_create_mapping_multiple_unique_values(): + """Test the _create_mapping function with multiple unique values.""" + column = ["apple", "banana", "cherry", "date"] + expected_mapping = {"apple": 0, "banana": 1, "cherry": 2, "date": 3} + + result = _create_mapping(column) + + assert result == expected_mapping + + +def test_aggregate_with_numba(): + """Test the _aggregate_with_numba function.""" + values = np.array([1.0, 2.0, 3.0, 4.0, 5.0]) + key_indices = np.array([0, 1, 0, 1, 0]) + measurement_indices = np.array([0, 0, 1, 1, 0]) + num_keys = 2 + num_measurements = 2 + + expected_aggregated_values = np.array([[6.0, 3.0], [2.0, 4.0]]) + + result = _aggregate_with_numba(values, key_indices, measurement_indices, num_keys, num_measurements) + + np.testing.assert_array_equal(result, expected_aggregated_values) + + +def test_aggregate_with_numba_single_value(): + """Test the _aggregate_with_numba function with a single value.""" + values = np.array([1.0]) + key_indices = np.array([0]) + measurement_indices = np.array([0]) + num_keys = 1 + num_measurements = 1 + + expected_aggregated_values = np.array([[1.0]]) + + result = _aggregate_with_numba(values, key_indices, measurement_indices, num_keys, num_measurements) + + np.testing.assert_array_equal(result, expected_aggregated_values) + + +def test_aggregate_with_numba_multiple_measurements(): + """Test the _aggregate_with_numba function with multiple measurements.""" + values = np.array([1.0, 2.0, 3.0, 4.0]) + key_indices = np.array([0, 0, 1, 1]) + measurement_indices = np.array([0, 1, 0, 1]) + num_keys = 2 + num_measurements = 2 + + expected_aggregated_values = np.array([[1.0, 2.0], [3.0, 4.0]]) + + result = _aggregate_with_numba(values, key_indices, measurement_indices, num_keys, num_measurements) + + np.testing.assert_array_equal(result, expected_aggregated_values) diff --git a/jobbergate-agent/tests/jobbergate/test_update.py b/jobbergate-agent/tests/jobbergate/test_update.py index d238aa263..862f27381 100644 --- a/jobbergate-agent/tests/jobbergate/test_update.py +++ b/jobbergate-agent/tests/jobbergate/test_update.py @@ -1,17 +1,16 @@ import json import random -import time from datetime import datetime -from typing import get_args, cast +from typing import get_args from unittest import mock -from collections.abc import Callable, Generator, Iterator +from collections.abc import Callable import contextlib import httpx import pytest import respx -from jobbergate_agent.jobbergate.schemas import ActiveJobSubmission, SlurmJobData, InfluxDBMeasure +from jobbergate_agent.jobbergate.schemas import ActiveJobSubmission, SlurmJobData from jobbergate_agent.jobbergate.update import ( fetch_active_submissions, fetch_job_data, @@ -19,7 +18,6 @@ update_job_data, fetch_influx_data, fetch_influx_measurements, - aggregate_influx_measures, update_job_metrics, ) from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT @@ -27,72 +25,6 @@ from jobbergate_agent.utils.exception import JobbergateApiError -@pytest.fixture() -def generate_job_metrics_data() -> Callable[[int, int, int, int, int], Generator[InfluxDBMeasure]]: - """Generates sample InfluxDB data with multiple points per measurement.""" - - def _generate_influxdb_data( - num_points_per_measurement: int, num_hosts: int, num_jobs: int, num_steps: int, num_tasks: int - ) -> Generator[InfluxDBMeasure]: - current_time = int(datetime.now().timestamp()) - - for host in range(1, num_hosts + 1): - for job in range(1, num_jobs + 1): - for step in range(1, num_steps + 1): - for task in range(1, num_tasks + 1): - for measurement in get_args(INFLUXDB_MEASUREMENT): - for _ in range(num_points_per_measurement): - yield { - "time": current_time, - "host": f"host_{host}", - "job": str(job), - "step": str(step), - "task": str(task), - "value": random.random() * 100, - "measurement": measurement, - } - current_time += random.randint(30, 60) # increment time by a random interval - - return _generate_influxdb_data - - -@pytest.fixture() -def aggregate_job_metrics_data(): - """Generates aggregated InfluxDB data with multiple points per measurement.""" - - def _aggregate_influxdb_data( - data_points: Iterator[InfluxDBMeasure], - ) -> list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]]: - measurement_names = get_args(INFLUXDB_MEASUREMENT) - default_measurements: dict[str, float] = {measurement: 0.0 for measurement in measurement_names} - - aggregated_data: dict[tuple[int, str, str, str], dict[str, float]] = {} - - for measure in data_points: - key = (measure["time"], measure["host"], measure["step"], measure["task"]) - - # aggregate measurements lazily to avoid creating a new dict for each point - if key not in aggregated_data: - aggregated_data[key] = default_measurements.copy() - aggregated_data[key][measure["measurement"]] = measure["value"] - - return cast( - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], - [ - ( - time, - host, - step, - task, - *(aggregated_data[(time, host, step, task)][measurement] for measurement in measurement_names), - ) - for (time, host, step, task) in aggregated_data - ], - ) - - return _aggregate_influxdb_data - - @pytest.fixture() def job_max_times_response() -> Callable[[int, int, int, int], dict[str, int | list[dict[str, int | str]]]]: """Generates a sample response for the endpoint @@ -550,65 +482,6 @@ async def test_fetch_influx_measurements__raises_JobbergateApiError_if_query_fai mocked_influxdb_client.get_list_measurements.assert_called_once_with() -@pytest.mark.asyncio -@pytest.mark.parametrize( - "num_points_per_measurement, num_hosts, num_jobs, num_steps, num_tasks", - [ - (3, 5, 2, 7, 4), - (1, 1, 1, 1, 1), - (7, 3, 10, 4, 2), - ], -) -async def test_aggregate_influx_measures__success( - num_points_per_measurement: int, - num_hosts: int, - num_jobs: int, - num_steps: int, - num_tasks: int, - generate_job_metrics_data: Callable[[int, int, int, int, int], Generator[InfluxDBMeasure]], - aggregate_job_metrics_data: Callable[ - [Iterator[InfluxDBMeasure]], - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], - ], -): - """ - Test that the ``aggregate_influx_measures()`` function can successfully aggregate - a list of InfluxDBMeasure data points. - """ - data_points = list( - generate_job_metrics_data( - num_points_per_measurement, - num_hosts, - num_jobs, - num_steps, - num_tasks, - ) - ) - - start_time = time.monotonic() - result = aggregate_influx_measures(iter(data_points)) - end_time = time.monotonic() - - expected_result = aggregate_job_metrics_data(iter(data_points)) - - assert result == expected_result - - print(f"Aggregated {len(data_points)} data points in {end_time - start_time:.5f} seconds") - - -@pytest.mark.asyncio -async def test_aggregate_influx_measures__empty_data_points(): - """ - Test that the ``aggregate_influx_measures()`` function returns an empty list - when given an empty iterator of data points. - """ - data_points = [] - - result = aggregate_influx_measures(iter(data_points)) - - assert result == [] - - @pytest.mark.asyncio @pytest.mark.usefixtures("mock_access_token") @pytest.mark.parametrize( From e17f8bf854a3de01ee7ade2226c78935d4ff61e3 Mon Sep 17 00:00:00 2001 From: matheushent Date: Wed, 11 Dec 2024 17:13:59 +0000 Subject: [PATCH 06/13] PENG-2457 set the content type as application/octet-stream when uploading the metrics to the API --- jobbergate-agent/jobbergate_agent/jobbergate/update.py | 4 +++- jobbergate-agent/tests/jobbergate/test_update.py | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/update.py b/jobbergate-agent/jobbergate_agent/jobbergate/update.py index 9203bf9f8..c3a8aa7a0 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/update.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/update.py @@ -158,7 +158,9 @@ async def update_job_metrics(active_job_submittion: ActiveJobSubmission) -> None packed_data = msgpack.packb(aggregated_data_points) response = await jobbergate_api_client.put( - f"jobbergate/job-submissions/agent/metrics/{active_job_submittion.id}", content=packed_data + f"jobbergate/job-submissions/agent/metrics/{active_job_submittion.id}", + content=packed_data, + headers={"Content-Type": "application/octet-stream"}, ) response.raise_for_status() diff --git a/jobbergate-agent/tests/jobbergate/test_update.py b/jobbergate-agent/tests/jobbergate/test_update.py index 862f27381..9b949949d 100644 --- a/jobbergate-agent/tests/jobbergate/test_update.py +++ b/jobbergate-agent/tests/jobbergate/test_update.py @@ -575,6 +575,7 @@ async def test_update_job_metrics__error_sending_metrics_to_api( respx.put( f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/metrics/{job_submission_id}", content=b"dummy-msgpack-data", + headers={"Content-Type": "application/octet-stream"}, ).mock(return_value=httpx.Response(status_code=400)) with pytest.raises( @@ -669,6 +670,7 @@ async def test_update_job_metrics__success( respx.put( f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/metrics/{job_submission_id}", content=b"dummy-msgpack-data", + headers={"Content-Type": "application/octet-stream"}, ).mock(return_value=httpx.Response(status_code=200)) await update_job_metrics(active_job_submission) From 27c0e6b82a2497516187597efc0595e616c08541 Mon Sep 17 00:00:00 2001 From: matheushent Date: Thu, 12 Dec 2024 18:36:37 +0000 Subject: [PATCH 07/13] PENG-2457 collapse if-else condition regarding the influx SSL configurations --- jobbergate-agent/jobbergate_agent/settings.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/jobbergate-agent/jobbergate_agent/settings.py b/jobbergate-agent/jobbergate_agent/settings.py index 0c9540e4c..f742c22b6 100644 --- a/jobbergate-agent/jobbergate_agent/settings.py +++ b/jobbergate-agent/jobbergate_agent/settings.py @@ -99,12 +99,11 @@ def compute_extra_settings(self) -> Self: @model_validator(mode="after") def validate_influxdb_settings(self) -> Self: if self.influx_integration_enabled: - if self.INFLUX_SSL: - buzz.require_condition( - self.INFLUX_CERT_PATH is not None, - "INFLUX_CERT_PATH must be provided when INFLUX_SSL is enabled", - ValueError, - ) + buzz.require_condition( + not self.INFLUX_SSL or self.INFLUX_CERT_PATH is not None, + "INFLUX_CERT_PATH must be provided when INFLUX_SSL is enabled", + ValueError, + ) assert self.INFLUX_DSN is not None if self.INFLUX_DSN.scheme not in ["influxdb", "https+influxdb", "udp+influxdb"]: From 77fd610ad70a8bc1dec2c9395c8c51fd7ba47db4 Mon Sep 17 00:00:00 2001 From: matheushent Date: Thu, 12 Dec 2024 18:41:22 +0000 Subject: [PATCH 08/13] PENG-2457 improve docstring of the function *measure_memory_usage* --- jobbergate-agent/jobbergate_agent/utils/compute.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobbergate-agent/jobbergate_agent/utils/compute.py b/jobbergate-agent/jobbergate_agent/utils/compute.py index 0ca13677f..882029bcb 100644 --- a/jobbergate-agent/jobbergate_agent/utils/compute.py +++ b/jobbergate-agent/jobbergate_agent/utils/compute.py @@ -19,7 +19,7 @@ def measure_memory_usage(func: Callable) -> Callable: """Decorator to measure the memory usage of a function. Args: - func: Function to measure memory usage of. + func: Function whose memory usage should be measured. Returns: Decorated function. From 36108aadf642bd4b3a73c214ba735f3a698d5207 Mon Sep 17 00:00:00 2001 From: matheushent Date: Thu, 12 Dec 2024 18:42:30 +0000 Subject: [PATCH 09/13] PENG-2457 turn the INFLUXDB_CLIENT object lower-cased --- .../jobbergate_agent/clients/influx.py | 2 +- .../jobbergate_agent/jobbergate/schemas.py | 4 ++-- .../jobbergate_agent/jobbergate/update.py | 10 +++++----- .../tests/jobbergate/test_update.py | 18 +++++++++--------- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/jobbergate-agent/jobbergate_agent/clients/influx.py b/jobbergate-agent/jobbergate_agent/clients/influx.py index 4465ff326..ee5c0e705 100644 --- a/jobbergate-agent/jobbergate_agent/clients/influx.py +++ b/jobbergate-agent/jobbergate_agent/clients/influx.py @@ -24,4 +24,4 @@ def initialize_influx_client() -> None | InfluxDBClient: return None -INFLUXDB_CLIENT = initialize_influx_client() +influxdb_client = initialize_influx_client() diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py b/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py index d85bec017..c265b0bb1 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py @@ -102,7 +102,7 @@ class InfluxDBMeasure(TypedDict): measurement: INFLUXDB_MEASUREMENT -class JobSubmissionMetricsMaxTimes(pydantic.BaseModel): +class JobSubmissionMetricsMaxTime(pydantic.BaseModel): """ Model for the max_times field of the JobSubmissionMetricsMaxResponse. """ @@ -119,4 +119,4 @@ class JobSubmissionMetricsMaxResponse(pydantic.BaseModel): """ job_submission_id: int - max_times: list[JobSubmissionMetricsMaxTimes] + max_times: list[JobSubmissionMetricsMaxTime] diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/update.py b/jobbergate-agent/jobbergate_agent/jobbergate/update.py index c3a8aa7a0..a049ec31e 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/update.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/update.py @@ -8,7 +8,7 @@ from loguru import logger from jobbergate_agent.clients.cluster_api import backend_client as jobbergate_api_client -from jobbergate_agent.clients.influx import INFLUXDB_CLIENT +from jobbergate_agent.clients.influx import influxdb_client from jobbergate_agent.jobbergate.schemas import ( ActiveJobSubmission, SlurmJobData, @@ -92,10 +92,10 @@ async def fetch_influx_data( SELECT * FROM {measurement} WHERE time > $time AND host = $host AND step = $step AND task = $task AND job = $job """ with JobbergateApiError.handle_errors("Failed to fetch data from InfluxDB", do_except=log_error): - assert INFLUXDB_CLIENT is not None + assert influxdb_client is not None params = dict(time=time, host=host, step=str(step), task=str(task), job=str(job)) logger.debug(f"Querying InfluxDB with: {query=}, {params=}") - result = INFLUXDB_CLIENT.query(query, bind_params=params, epoch="us") + result = influxdb_client.query(query, bind_params=params, epoch="us") logger.debug("Successfully fetched data from InfluxDB") return [ InfluxDBMeasure( @@ -117,8 +117,8 @@ def fetch_influx_measurements() -> list[InfluxDBMeasurement]: """ with JobbergateApiError.handle_errors("Failed to fetch measurements from InfluxDB", do_except=log_error): logger.debug("Fetching measurements from InfluxDB") - assert INFLUXDB_CLIENT is not None - measurements: list[InfluxDBMeasurement] = INFLUXDB_CLIENT.get_list_measurements() + assert influxdb_client is not None + measurements: list[InfluxDBMeasurement] = influxdb_client.get_list_measurements() logger.debug(f"Fetched measurements from InfluxDB: {measurements=}") return measurements diff --git a/jobbergate-agent/tests/jobbergate/test_update.py b/jobbergate-agent/tests/jobbergate/test_update.py index 9b949949d..3b3dd3e60 100644 --- a/jobbergate-agent/tests/jobbergate/test_update.py +++ b/jobbergate-agent/tests/jobbergate/test_update.py @@ -244,7 +244,7 @@ async def test_update_active_jobs( mocker.patch("jobbergate_agent.jobbergate.update.InfoHandler", return_value=mocked_sbatch) mocked_influxdb_client = mock.MagicMock() - mocker.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT", return_value=mocked_influxdb_client) + mocker.patch("jobbergate_agent.jobbergate.update.influxdb_client", return_value=mocked_influxdb_client) active_job_submissions = [ ActiveJobSubmission(id=1, slurm_job_id=11), # Will update @@ -332,7 +332,7 @@ def _mocked_update_job_data(job_submission_id, slurm_job_data): @pytest.mark.asyncio -@mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT") +@mock.patch("jobbergate_agent.jobbergate.update.influxdb_client") async def test_fetch_influx_data__success(mocked_influxdb_client: mock.MagicMock): """ Test that the ``fetch_influx_data()`` function can successfully retrieve @@ -377,7 +377,7 @@ async def test_fetch_influx_data__success(mocked_influxdb_client: mock.MagicMock @pytest.mark.asyncio -@mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT") +@mock.patch("jobbergate_agent.jobbergate.update.influxdb_client") async def test_fetch_influx_data__raises_JobbergateApiError_if_query_fails(mocked_influxdb_client: mock.MagicMock): """ Test that the ``fetch_influx_data()`` function will raise a JobbergateApiError @@ -415,10 +415,10 @@ async def test_fetch_influx_data__raises_JobbergateApiError_if_query_fails(mocke async def test_fetch_influx_data__raises_JobbergateApiError_if_influxdb_client_is_None(): """ Test that the ``fetch_influx_data()`` function will raise a JobbergateApiError - if the INFLUXDB_CLIENT is None. + if the influxdb_client is None. """ measurement = random.choice(get_args(INFLUXDB_MEASUREMENT)) - with mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT", None): + with mock.patch("jobbergate_agent.jobbergate.update.influxdb_client", None): with pytest.raises(JobbergateApiError, match="Failed to fetch data from InfluxDB"): await fetch_influx_data( time=random.randint(0, 1000), @@ -439,7 +439,7 @@ async def test_fetch_influx_data__raises_JobbergateApiError_if_influxdb_client_i [], ], ) -@mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT") +@mock.patch("jobbergate_agent.jobbergate.update.influxdb_client") async def test_fetch_influx_measurements__success( mocked_influxdb_client: mock.MagicMock, measurements: list[dict[str, str]] ): @@ -458,15 +458,15 @@ async def test_fetch_influx_measurements__success( async def test_fetch_influx_measurements__raises_JobbergateApiError_if_influxdb_client_is_None(): """ Test that the ``fetch_influx_measurements()`` function will raise a JobbergateApiError - if the INFLUXDB_CLIENT is None. + if the influxdb_client is None. """ - with mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT", None): + with mock.patch("jobbergate_agent.jobbergate.update.influxdb_client", None): with pytest.raises(JobbergateApiError, match="Failed to fetch measurements from InfluxDB"): fetch_influx_measurements() @pytest.mark.asyncio -@mock.patch("jobbergate_agent.jobbergate.update.INFLUXDB_CLIENT") +@mock.patch("jobbergate_agent.jobbergate.update.influxdb_client") async def test_fetch_influx_measurements__raises_JobbergateApiError_if_query_fails( mocked_influxdb_client: mock.MagicMock, ): From cf0955b169b1af76a0525107f37bc4cef3d478d6 Mon Sep 17 00:00:00 2001 From: matheushent Date: Thu, 12 Dec 2024 18:53:18 +0000 Subject: [PATCH 10/13] PENG-2457 improve typing by adding a TypeAlias for the job metric data structure expected by the API --- .../jobbergate_agent/jobbergate/schemas.py | 12 +++++++++++- jobbergate-agent/jobbergate_agent/utils/compute.py | 7 +++---- jobbergate-agent/tests/jobbergate/test_compute.py | 10 +++++----- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py b/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py index c265b0bb1..3d241d2ed 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Optional, TypedDict +from typing import List, Optional, TypedDict, TypeAlias import pydantic from pydantic import ConfigDict @@ -120,3 +120,13 @@ class JobSubmissionMetricsMaxResponse(pydantic.BaseModel): job_submission_id: int max_times: list[JobSubmissionMetricsMaxTime] + + +""" +Type alias for job metric structure. It matches the following sequence of data +(time, host, step, task, CPUFrequency, CPUTime, CPUUtilization, GPUMemMB, +GPUUtilization, Pages, RSS, VMSize, ReadMB, WriteMB) +""" +JobMetricData: TypeAlias = list[ + tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float] +] diff --git a/jobbergate-agent/jobbergate_agent/utils/compute.py b/jobbergate-agent/jobbergate_agent/utils/compute.py index 882029bcb..202a919d6 100644 --- a/jobbergate-agent/jobbergate_agent/utils/compute.py +++ b/jobbergate-agent/jobbergate_agent/utils/compute.py @@ -10,9 +10,8 @@ from loguru import logger from numba import njit -from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure - from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT +from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure, JobMetricData def measure_memory_usage(func: Callable) -> Callable: @@ -64,7 +63,7 @@ def _aggregate_with_numba( @measure_memory_usage def aggregate_influx_measures( data_points: Iterator[InfluxDBMeasure], -) -> list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]]: +) -> JobMetricData: """Aggregate the list of data points by time, host, step and task. The output data is a list of tuples with the following format: @@ -112,7 +111,7 @@ def aggregate_influx_measures( reverse_task_mapping = {v: k for k, v in task_mapping.items()} return cast( - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + JobMetricData, [ ( int(unique_key[0]), # time diff --git a/jobbergate-agent/tests/jobbergate/test_compute.py b/jobbergate-agent/tests/jobbergate/test_compute.py index 69984b3c0..4799ae63a 100644 --- a/jobbergate-agent/tests/jobbergate/test_compute.py +++ b/jobbergate-agent/tests/jobbergate/test_compute.py @@ -11,7 +11,7 @@ import numpy as np from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT -from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure +from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure, JobMetricData from jobbergate_agent.utils.compute import ( aggregate_influx_measures, measure_memory_usage, @@ -26,7 +26,7 @@ def generate_and_aggregate_job_metrics_data() -> ( [int, int, int, int, int], tuple[ list[InfluxDBMeasure], - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + JobMetricData, ], ] ): @@ -40,7 +40,7 @@ def _generate_and_aggregate( num_points_per_measurement: int, num_hosts: int, num_jobs: int, num_steps: int, num_tasks: int ) -> tuple[ list[InfluxDBMeasure], - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + JobMetricData, ]: # Initialize data structures current_time = int(datetime.now().timestamp()) @@ -82,7 +82,7 @@ def _generate_and_aggregate( # Create aggregated list aggregated_list = cast( - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + JobMetricData, [ ( time, @@ -119,7 +119,7 @@ async def test_aggregate_influx_measures__success( [int, int, int, int, int], tuple[ list[InfluxDBMeasure], - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + JobMetricData, ], ], ): From a5d64b0e7e224d71df1a60a0b9afae192b9d49e0 Mon Sep 17 00:00:00 2001 From: matheushent Date: Fri, 13 Dec 2024 17:37:19 +0000 Subject: [PATCH 11/13] PENG-2457 add comments explaining some assertions are for mypy --- jobbergate-agent/jobbergate_agent/jobbergate/update.py | 2 +- jobbergate-agent/jobbergate_agent/settings.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/update.py b/jobbergate-agent/jobbergate_agent/jobbergate/update.py index a049ec31e..87f80cb6c 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/update.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/update.py @@ -92,7 +92,7 @@ async def fetch_influx_data( SELECT * FROM {measurement} WHERE time > $time AND host = $host AND step = $step AND task = $task AND job = $job """ with JobbergateApiError.handle_errors("Failed to fetch data from InfluxDB", do_except=log_error): - assert influxdb_client is not None + assert influxdb_client is not None # mypy assertion params = dict(time=time, host=host, step=str(step), task=str(task), job=str(job)) logger.debug(f"Querying InfluxDB with: {query=}, {params=}") result = influxdb_client.query(query, bind_params=params, epoch="us") diff --git a/jobbergate-agent/jobbergate_agent/settings.py b/jobbergate-agent/jobbergate_agent/settings.py index f742c22b6..4b5226ee2 100644 --- a/jobbergate-agent/jobbergate_agent/settings.py +++ b/jobbergate-agent/jobbergate_agent/settings.py @@ -105,7 +105,7 @@ def validate_influxdb_settings(self) -> Self: ValueError, ) - assert self.INFLUX_DSN is not None + assert self.INFLUX_DSN is not None # mypy assertion if self.INFLUX_DSN.scheme not in ["influxdb", "https+influxdb", "udp+influxdb"]: raise ValueError("INFLUX_DSN scheme must be one of 'influxdb', 'https+influxdb' or 'udp+influxdb'") return self From 3f0b6f13f92ec33eb4fb3d54fdc4dd39ee5a642f Mon Sep 17 00:00:00 2001 From: matheushent Date: Fri, 13 Dec 2024 17:45:31 +0000 Subject: [PATCH 12/13] PENG-2457 attempt to better differentiate typed dictionaries in *jobbergate_agent/jobbergate/schemas.py* --- .../jobbergate_agent/jobbergate/schemas.py | 4 ++-- .../jobbergate_agent/jobbergate/update.py | 12 ++++++------ jobbergate-agent/jobbergate_agent/utils/compute.py | 4 ++-- jobbergate-agent/tests/jobbergate/test_compute.py | 12 ++++++------ jobbergate-agent/tests/jobbergate/test_update.py | 2 +- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py b/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py index 3d241d2ed..d463334df 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py @@ -80,7 +80,7 @@ class SlurmJobData(pydantic.BaseModel, extra="ignore"): state_reason: Optional[str] = None -class InfluxDBMeasurement(TypedDict): +class InfluxDBMeasurementDict(TypedDict): """ Map each entry in the list returned by `InfluxDBClient(...).get_list_measurements(...)`. """ @@ -88,7 +88,7 @@ class InfluxDBMeasurement(TypedDict): name: INFLUXDB_MEASUREMENT -class InfluxDBMeasure(TypedDict): +class InfluxDBPointDict(TypedDict): """ Map each entry in the generator returned by InfluxDBClient(...).query(...).get_points(). """ diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/update.py b/jobbergate-agent/jobbergate_agent/jobbergate/update.py index 87f80cb6c..4c9472316 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/update.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/update.py @@ -13,8 +13,8 @@ ActiveJobSubmission, SlurmJobData, JobSubmissionMetricsMaxResponse, - InfluxDBMeasurement, - InfluxDBMeasure, + InfluxDBMeasurementDict, + InfluxDBPointDict, ) from jobbergate_agent.settings import SETTINGS from jobbergate_agent.utils.exception import JobbergateApiError, SbatchError @@ -84,7 +84,7 @@ async def update_job_data( async def fetch_influx_data( time: int, host: str, step: int, task: int, job: int, measurement: INFLUXDB_MEASUREMENT -) -> list[InfluxDBMeasure]: +) -> list[InfluxDBPointDict]: """ Fetch data from InfluxDB for a given host, step and task. """ @@ -98,7 +98,7 @@ async def fetch_influx_data( result = influxdb_client.query(query, bind_params=params, epoch="us") logger.debug("Successfully fetched data from InfluxDB") return [ - InfluxDBMeasure( + InfluxDBPointDict( time=point["time"], host=point["host"], job=point["job"], @@ -111,14 +111,14 @@ async def fetch_influx_data( ] -def fetch_influx_measurements() -> list[InfluxDBMeasurement]: +def fetch_influx_measurements() -> list[InfluxDBMeasurementDict]: """ Fetch measurements from InfluxDB. """ with JobbergateApiError.handle_errors("Failed to fetch measurements from InfluxDB", do_except=log_error): logger.debug("Fetching measurements from InfluxDB") assert influxdb_client is not None - measurements: list[InfluxDBMeasurement] = influxdb_client.get_list_measurements() + measurements: list[InfluxDBMeasurementDict] = influxdb_client.get_list_measurements() logger.debug(f"Fetched measurements from InfluxDB: {measurements=}") return measurements diff --git a/jobbergate-agent/jobbergate_agent/utils/compute.py b/jobbergate-agent/jobbergate_agent/utils/compute.py index 202a919d6..eddc9e19d 100644 --- a/jobbergate-agent/jobbergate_agent/utils/compute.py +++ b/jobbergate-agent/jobbergate_agent/utils/compute.py @@ -11,7 +11,7 @@ from numba import njit from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT -from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure, JobMetricData +from jobbergate_agent.jobbergate.schemas import InfluxDBPointDict, JobMetricData def measure_memory_usage(func: Callable) -> Callable: @@ -62,7 +62,7 @@ def _aggregate_with_numba( @measure_memory_usage def aggregate_influx_measures( - data_points: Iterator[InfluxDBMeasure], + data_points: Iterator[InfluxDBPointDict], ) -> JobMetricData: """Aggregate the list of data points by time, host, step and task. diff --git a/jobbergate-agent/tests/jobbergate/test_compute.py b/jobbergate-agent/tests/jobbergate/test_compute.py index 4799ae63a..e8abf5767 100644 --- a/jobbergate-agent/tests/jobbergate/test_compute.py +++ b/jobbergate-agent/tests/jobbergate/test_compute.py @@ -11,7 +11,7 @@ import numpy as np from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT -from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure, JobMetricData +from jobbergate_agent.jobbergate.schemas import InfluxDBPointDict, JobMetricData from jobbergate_agent.utils.compute import ( aggregate_influx_measures, measure_memory_usage, @@ -25,7 +25,7 @@ def generate_and_aggregate_job_metrics_data() -> ( Callable[ [int, int, int, int, int], tuple[ - list[InfluxDBMeasure], + list[InfluxDBPointDict], JobMetricData, ], ] @@ -39,7 +39,7 @@ def generate_and_aggregate_job_metrics_data() -> ( def _generate_and_aggregate( num_points_per_measurement: int, num_hosts: int, num_jobs: int, num_steps: int, num_tasks: int ) -> tuple[ - list[InfluxDBMeasure], + list[InfluxDBPointDict], JobMetricData, ]: # Initialize data structures @@ -63,7 +63,7 @@ def _generate_and_aggregate( for measurement in measurement_names: value = random.random() * 100 - measure = InfluxDBMeasure( + measure = InfluxDBPointDict( **{ "time": current_time, "host": f"host_{host}", @@ -118,14 +118,14 @@ async def test_aggregate_influx_measures__success( generate_and_aggregate_job_metrics_data: Callable[ [int, int, int, int, int], tuple[ - list[InfluxDBMeasure], + list[InfluxDBPointDict], JobMetricData, ], ], ): """ Test that the ``aggregate_influx_measures()`` function can successfully aggregate - a list of InfluxDBMeasure data points. + a list of InfluxDBPointDict data points. """ measures, expected_aggregated_data = generate_and_aggregate_job_metrics_data( num_points_per_measurement, num_hosts, num_jobs, num_steps, num_tasks diff --git a/jobbergate-agent/tests/jobbergate/test_update.py b/jobbergate-agent/tests/jobbergate/test_update.py index 3b3dd3e60..1313c90ef 100644 --- a/jobbergate-agent/tests/jobbergate/test_update.py +++ b/jobbergate-agent/tests/jobbergate/test_update.py @@ -336,7 +336,7 @@ def _mocked_update_job_data(job_submission_id, slurm_job_data): async def test_fetch_influx_data__success(mocked_influxdb_client: mock.MagicMock): """ Test that the ``fetch_influx_data()`` function can successfully retrieve - data from InfluxDB as a list of ``InfluxDBMeasure``. + data from InfluxDB as a list of ``InfluxDBPointDict``. """ time = random.randint(0, 1000) # noqa: F811 host = "test-host" From ab37a463bc292a4f92680d39f30a5dd42d2eddfb Mon Sep 17 00:00:00 2001 From: matheushent Date: Fri, 13 Dec 2024 18:13:34 +0000 Subject: [PATCH 13/13] PENG-2457 remove print line from the *test_aggregate_influx_measures__success* unit test --- jobbergate-agent/tests/jobbergate/test_compute.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/jobbergate-agent/tests/jobbergate/test_compute.py b/jobbergate-agent/tests/jobbergate/test_compute.py index e8abf5767..d1f579b63 100644 --- a/jobbergate-agent/tests/jobbergate/test_compute.py +++ b/jobbergate-agent/tests/jobbergate/test_compute.py @@ -2,7 +2,6 @@ import pytest import random -import time from collections.abc import Callable from datetime import datetime from typing import cast, get_args @@ -131,11 +130,7 @@ async def test_aggregate_influx_measures__success( num_points_per_measurement, num_hosts, num_jobs, num_steps, num_tasks ) - start_time = time.monotonic() aggregated_data = aggregate_influx_measures(iter(measures)) - end_time = time.monotonic() - - print(f"Aggregated {len(measures)} data points in {end_time - start_time:.5f} seconds") for data_point in aggregated_data: assert data_point in expected_aggregated_data