Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an interface from fetching job metrics from InfluxDB #663

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions jobbergate-agent-snap/hooks/bin/configure
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ AGENT_VARIABLES_MAP: dict[str, Union[str, int]] = {
"SLURM_USER_MAPPER": "",
"SINGLE_USER_SUBMITTER": "ubuntu",
"WRITE_SUBMISSION_FILES": "true",
"INFLUX_DSN": "",
"INFLUX_POOL_SIZE": 10,
"INFLUX_SSL": "false",
"INFLUX_VERIFY_SSL": "false",
"INFLUX_TIMEOUT": "",
"INFLUX_UDP_PORT": 4444,
"INFLUX_CERT_PATH": "",
}


Expand Down
16 changes: 15 additions & 1 deletion jobbergate-agent-snap/snap/snapcraft.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: jobbergate-agent
base: core22
version: '0.4.0'
version: '0.5.0'
summary: The Jobbergate Agent snap
adopt-info: metadata
license: MIT
Expand Down Expand Up @@ -33,6 +33,20 @@ description: |

- write-submission-files: A boolean value (true, false) that indicates whether the agent should write submission files to disk. This is optional and defaults to false.

- influx-dsn: The DSN of the InfluxDB server that the agent will use to fetch job metrics data. It only allows the following schemes: 'influxdb', 'https+influxdb' and 'udp+influxdb'. This is optional and defaults to none.

- influx-pool-size: The size of the InfluxDB connection pool that the agent will use to fetch job metrics data. This is optional and defaults to 10.

- influx-ssl: A boolean value (true, false) that indicates whether the agent should use SSL to connect to the InfluxDB server. If true, `influx-cert-path` must be provided. This is optional and defaults to false.

- influx-verify-ssl: A boolean value (true, false) that indicates whether the agent should verify the SSL certificate of the InfluxDB server. This is optional and defaults to false.

- influx-timeout: The timeout in seconds that the agent will use when connecting to the InfluxDB server. This is optional and defaults to none.

- influx-udp-port: The UDP port that the agent will use to connect to the InfluxDB server. This is optional and defaults to 4444.

- influx-cert-path: The absolute path to the SSL certificate that the agent will use to connect to the InfluxDB server. This is optional and defaults to none.

For learning more about Jobbergate and how it can be used on Vantage, please visit https://docs.vantagehpc.io

grade: stable
Expand Down
2 changes: 2 additions & 0 deletions jobbergate-agent/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ This file keeps track of all notable changes to jobbergate-agent

## Unreleased

- Implement logic to retrieve job metrics data from InfluxDB and send it to the API. ([PENG-2457](https://sharing.clickup.com/t/h/c/18022949/PENG-2457/BU7UOA63B936N27))

## 5.4.0 -- 2024-11-18

- Changed auto-update task to reuse current scheduler instead of creating a new one
Expand Down
27 changes: 27 additions & 0 deletions jobbergate-agent/jobbergate_agent/clients/influx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Core module for defining the InfluxDB client."""

from influxdb import InfluxDBClient
from loguru import logger

from jobbergate_agent.settings import SETTINGS


def initialize_influx_client() -> None | InfluxDBClient:
"""Initialize the InfluxDB client."""
if SETTINGS.influx_integration_enabled:
logger.debug("InfluxDB integration is enabled. Initializing InfluxDB client...")
return InfluxDBClient.from_dsn(
str(SETTINGS.INFLUX_DSN),
pool_size=SETTINGS.INFLUX_POOL_SIZE,
ssl=SETTINGS.INFLUX_SSL,
verify_ssl=SETTINGS.INFLUX_VERIFY_SSL,
timeout=SETTINGS.INFLUX_TIMEOUT,
udp_port=SETTINGS.INFLUX_UDP_PORT,
cert=SETTINGS.INFLUX_CERT_PATH,
)
else:
logger.debug("InfluxDB integration is disabled")
return None


INFLUXDB_CLIENT = initialize_influx_client()
fschuch marked this conversation as resolved.
Show resolved Hide resolved
dusktreader marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 16 additions & 0 deletions jobbergate-agent/jobbergate_agent/jobbergate/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Literal

from auto_name_enum import AutoNameEnum, auto


Expand All @@ -6,3 +8,17 @@ class FileType(AutoNameEnum):

ENTRYPOINT = auto()
SUPPORT = auto()


INFLUXDB_MEASUREMENT = Literal[
fschuch marked this conversation as resolved.
Show resolved Hide resolved
"CPUFrequency",
"CPUTime",
"CPUUtilization",
"GPUMemMB",
"GPUUtilization",
"Pages",
"RSS",
"ReadMB",
"VMSize",
"WriteMB",
]
46 changes: 44 additions & 2 deletions jobbergate-agent/jobbergate_agent/jobbergate/schemas.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from pathlib import Path
from typing import List, Optional
from typing import List, Optional, TypedDict

import pydantic
from pydantic import ConfigDict

from jobbergate_agent.jobbergate.constants import FileType
from jobbergate_agent.jobbergate.constants import FileType, INFLUXDB_MEASUREMENT


class JobScriptFile(pydantic.BaseModel, extra="ignore"):
Expand Down Expand Up @@ -78,3 +78,45 @@ class SlurmJobData(pydantic.BaseModel, extra="ignore"):
job_state: Optional[str] = None
job_info: Optional[str] = None
state_reason: Optional[str] = None


class InfluxDBMeasurement(TypedDict):
dusktreader marked this conversation as resolved.
Show resolved Hide resolved
"""
Map each entry in the list returned by `InfluxDBClient(...).get_list_measurements(...)`.
"""

name: INFLUXDB_MEASUREMENT


class InfluxDBMeasure(TypedDict):
dusktreader marked this conversation as resolved.
Show resolved Hide resolved
"""
Map each entry in the generator returned by InfluxDBClient(...).query(...).get_points().
"""

time: int
host: str
job: str
step: str
task: str
value: float
measurement: INFLUXDB_MEASUREMENT


class JobSubmissionMetricsMaxTimes(pydantic.BaseModel):
"""
Model for the max_times field of the JobSubmissionMetricsMaxResponse.
"""

max_time: int
node_host: str
step: int
task: int


class JobSubmissionMetricsMaxResponse(pydantic.BaseModel):
dusktreader marked this conversation as resolved.
Show resolved Hide resolved
"""
Model for the response of the `/jobbergate/job-submissions/agent/metrics/{job_submission_id}` endpoint.
"""

job_submission_id: int
max_times: list[JobSubmissionMetricsMaxTimes]
105 changes: 104 additions & 1 deletion jobbergate-agent/jobbergate_agent/jobbergate/update.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
import asyncio
import json
from itertools import chain
from typing import List

import msgpack
from jobbergate_core.tools.sbatch import InfoHandler
from loguru import logger

from jobbergate_agent.clients.cluster_api import backend_client as jobbergate_api_client
from jobbergate_agent.jobbergate.schemas import ActiveJobSubmission, SlurmJobData
from jobbergate_agent.clients.influx import INFLUXDB_CLIENT
from jobbergate_agent.jobbergate.schemas import (
ActiveJobSubmission,
SlurmJobData,
JobSubmissionMetricsMaxResponse,
InfluxDBMeasurement,
InfluxDBMeasure,
)
from jobbergate_agent.settings import SETTINGS
from jobbergate_agent.utils.exception import JobbergateApiError, SbatchError
from jobbergate_agent.utils.logging import log_error
from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT
from jobbergate_agent.utils.compute import aggregate_influx_measures


async def fetch_job_data(slurm_job_id: int, info_handler: InfoHandler) -> SlurmJobData:
Expand Down Expand Up @@ -70,6 +82,89 @@ async def update_job_data(
response.raise_for_status()


async def fetch_influx_data(
time: int, host: str, step: int, task: int, job: int, measurement: INFLUXDB_MEASUREMENT
) -> list[InfluxDBMeasure]:
"""
Fetch data from InfluxDB for a given host, step and task.
"""
query = f"""
SELECT * FROM {measurement} WHERE time > $time AND host = $host AND step = $step AND task = $task AND job = $job
"""
with JobbergateApiError.handle_errors("Failed to fetch data from InfluxDB", do_except=log_error):
assert INFLUXDB_CLIENT is not None
dusktreader marked this conversation as resolved.
Show resolved Hide resolved
params = dict(time=time, host=host, step=str(step), task=str(task), job=str(job))
logger.debug(f"Querying InfluxDB with: {query=}, {params=}")
result = INFLUXDB_CLIENT.query(query, bind_params=params, epoch="us")
logger.debug("Successfully fetched data from InfluxDB")
return [
fschuch marked this conversation as resolved.
Show resolved Hide resolved
InfluxDBMeasure(
time=point["time"],
host=point["host"],
job=point["job"],
step=point["step"],
task=point["task"],
value=point["value"],
measurement=measurement,
)
for point in result.get_points()
]


def fetch_influx_measurements() -> list[InfluxDBMeasurement]:
"""
Fetch measurements from InfluxDB.
"""
with JobbergateApiError.handle_errors("Failed to fetch measurements from InfluxDB", do_except=log_error):
logger.debug("Fetching measurements from InfluxDB")
assert INFLUXDB_CLIENT is not None
measurements: list[InfluxDBMeasurement] = INFLUXDB_CLIENT.get_list_measurements()
logger.debug(f"Fetched measurements from InfluxDB: {measurements=}")
return measurements


async def update_job_metrics(active_job_submittion: ActiveJobSubmission) -> None:
"""Update job metrics for a job submission.

This function fetches the metrics from InfluxDB and sends to the API.
"""
with JobbergateApiError.handle_errors(
f"Could not update job metrics for slurm job {active_job_submittion.slurm_job_id} via the API",
do_except=log_error,
):
response = await jobbergate_api_client.get(
f"jobbergate/job-submissions/agent/metrics/{active_job_submittion.id}"
)
response.raise_for_status()
job_max_times = JobSubmissionMetricsMaxResponse(**response.json())

influx_measurements = fetch_influx_measurements()

tasks = (
fetch_influx_data(
job_max_time.max_time,
job_max_time.node_host,
job_max_time.step,
job_max_time.task,
active_job_submittion.slurm_job_id,
measurement["name"],
)
for job_max_time in job_max_times.max_times
for measurement in influx_measurements
)
results = await asyncio.gather(*list(tasks))
data_points = chain.from_iterable(results)
aggregated_data_points = aggregate_influx_measures(data_points)
dusktreader marked this conversation as resolved.
Show resolved Hide resolved
packed_data = msgpack.packb(aggregated_data_points)

response = await jobbergate_api_client.put(
f"jobbergate/job-submissions/agent/metrics/{active_job_submittion.id}",
content=packed_data,
headers={"Content-Type": "application/octet-stream"},
)
response.raise_for_status()


async def update_active_jobs() -> None:
"""
Update slurm job state for active jobs.
Expand All @@ -83,6 +178,14 @@ async def update_active_jobs() -> None:

skip = "skipping to next active job"
for active_job_submission in active_job_submissions:
if SETTINGS.influx_integration_enabled:
logger.debug(f"Updating job metrics for job_submission {active_job_submission.id}")
try:
await update_job_metrics(active_job_submission)
except Exception:
logger.error("Update job metrics failed... skipping for job data update")
pass

logger.debug(f"Fetching slurm job state of job_submission {active_job_submission.id}")

try:
Expand Down
32 changes: 31 additions & 1 deletion jobbergate-agent/jobbergate_agent/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Optional

import buzz
from pydantic import AnyHttpUrl, Field, ValidationError, model_validator
from pydantic import AnyHttpUrl, Field, ValidationError, model_validator, AnyUrl
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing_extensions import Self

Expand Down Expand Up @@ -60,6 +60,21 @@ class Settings(BaseSettings):
# Job submission settings
WRITE_SUBMISSION_FILES: bool = True

# InfluxDB settings for job metric collection
INFLUX_DSN: Optional[AnyUrl] = Field(
None, description="InfluxDB DSN. Only supports the schemes 'influxdb', 'https+influxdb' and 'udp+influxdb'"
)
INFLUX_POOL_SIZE: int = Field(10, ge=1, description="Number of InfluxDB connections to pool")
INFLUX_SSL: bool = Field(False, description="Use SSL for InfluxDB connection")
INFLUX_VERIFY_SSL: bool = Field(False, description="Verify SSL certificate for InfluxDB connection")
INFLUX_TIMEOUT: Optional[int] = Field(None, ge=1, description="Timeout for InfluxDB connection")
INFLUX_UDP_PORT: int = Field(4444, ge=1, le=65535, description="UDP port for InfluxDB connection")
INFLUX_CERT_PATH: Optional[Path] = Field(None, description="Path to InfluxDB certificate file")

@property
def influx_integration_enabled(self) -> bool:
return self.INFLUX_DSN is not None

@model_validator(mode="after")
def compute_extra_settings(self) -> Self:
"""
Expand All @@ -81,6 +96,21 @@ def compute_extra_settings(self) -> Self:
self.SINGLE_USER_SUBMITTER = self.X_SLURM_USER_NAME
return self

@model_validator(mode="after")
def validate_influxdb_settings(self) -> Self:
if self.influx_integration_enabled:
if self.INFLUX_SSL:
buzz.require_condition(
self.INFLUX_CERT_PATH is not None,
matheushent marked this conversation as resolved.
Show resolved Hide resolved
"INFLUX_CERT_PATH must be provided when INFLUX_SSL is enabled",
ValueError,
)

assert self.INFLUX_DSN is not None
dusktreader marked this conversation as resolved.
Show resolved Hide resolved
if self.INFLUX_DSN.scheme not in ["influxdb", "https+influxdb", "udp+influxdb"]:
raise ValueError("INFLUX_DSN scheme must be one of 'influxdb', 'https+influxdb' or 'udp+influxdb'")
return self

model_config = SettingsConfigDict(env_prefix="JOBBERGATE_AGENT_", env_file=_get_env_file(), extra="ignore")


Expand Down
Loading