Skip to content

Commit

Permalink
Add/python parser generic (#60)
Browse files Browse the repository at this point in the history
* preparing to run osu benchmarks experiments

we need to have a generic parser to allow for saving
the complete, raw logs. The parser is good, but we can
run it after - I would rather be conservative and get
the entire log vs have some bug with parsing and miss
something.

Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch authored Sep 15, 2023
1 parent e78597d commit 6368608
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 20 deletions.
1 change: 1 addition & 0 deletions sdk/python/v1alpha1/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and **Merged pull requests**. Critical items to know are:
The versions coincide with releases on pip. Only major versions will be released as tags on Github.

## [0.0.x](https://github.com/converged-computing/metrics-operator/tree/main) (0.0.x)
- Allow getting raw logs for any metric (without parser) (0.0.19)
- Refactor of structure of Operator and addition of metrics (0.0.18)
- Add wait for delete function to python parser (0.0.17)
- LAMMPS python parser (0.0.16)
Expand Down
22 changes: 15 additions & 7 deletions sdk/python/v1alpha1/metricsoperator/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,22 @@ def __init__(self, yaml_file):
self.spec = utils.read_yaml(self.yaml_file)
config.load_kube_config()

def watch(self):
def watch(self, raw_logs=False, pod_prefix=None, container_name=None):
"""
Wait for (and yield parsed) metric logs.
"""
if raw_logs and not pod_prefix:
raise ValueError("You must provide a pod_prefix to ask for raw logs.")

for metric in self.spec["spec"]["metrics"]:
parser = mutils.get_metric(metric["name"])(self.spec)
if raw_logs:
parser = mutils.get_metric()(self.spec, container_name=container_name)
else:
parser = mutils.get_metric(metric["name"])(
self.spec, container_name=container_name
)
print("Watching %s" % metric["name"])
for pod, container in parser.logging_containers():
for pod, container in parser.logging_containers(pod_prefix=pod_prefix):
yield parser.parse(pod=pod, container=container)

def create(self):
Expand Down Expand Up @@ -65,7 +73,7 @@ def namespace(self):
def name(self):
return self.spec["metadata"]["name"]

def delete(self):
def delete(self, pod_prefix=None):
"""
Delete the associated YAML file.
"""
Expand All @@ -77,14 +85,14 @@ def delete(self):
plural=self.plural,
name=self.name,
)
self.wait_for_delete()
self.wait_for_delete(pod_prefix)
return result

def wait_for_delete(self):
def wait_for_delete(self, pod_prefix=None):
"""
Wait for pods to be gone (deleted)
"""
for metric in self.spec["spec"]["metrics"]:
parser = mutils.get_metric(metric["name"])(self.spec)
print("Watching %s for deletion" % metric["name"])
parser.wait_for_delete()
parser.wait_for_delete(pod_prefix=pod_prefix)
6 changes: 4 additions & 2 deletions sdk/python/v1alpha1/metricsoperator/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)

import metricsoperator.metrics.app as apps
import metricsoperator.metrics.base as base
import metricsoperator.metrics.network as network
import metricsoperator.metrics.perf as perf
import metricsoperator.metrics.storage as storage
Expand All @@ -17,11 +18,12 @@
}


def get_metric(name):
def get_metric(name=None):
"""
Get a named metric parser.
"""
metric = metrics.get(name)
# If we don't have a matching metric, return base (for raw logs)
if not metric:
raise ValueError(f"Metric {name} does not have a known parser")
return base.MetricBase
return metric
46 changes: 36 additions & 10 deletions sdk/python/v1alpha1/metricsoperator/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class MetricBase:
collection_end = "METRICS OPERATOR COLLECTION END"
metadata_start = "METADATA START"
metadata_end = "METADATA END"
container_name = None

def __init__(self, spec=None, **kwargs):
"""
Expand All @@ -23,6 +24,10 @@ def __init__(self, spec=None, **kwargs):
self.spec = spec
self._core_v1 = kwargs.get("core_v1_api")

# If we don't have a default container name...
if not self.container_name:
self.container_name = kwargs.get("container_name") or "launcher"

# Load kubeconfig on Metricbase init only
if self.spec is not None:
config.load_kube_config()
Expand Down Expand Up @@ -55,6 +60,14 @@ def parse(self, pod, container):
)
return self.parse_log(lines)

def parse_log(self, lines):
"""
If the parser doesn't have anything, just return the lines
"""
# Get the log metadata, split lines by newline so not so hefty a log!
metadata = self.get_log_metadata(lines)
return {"data": lines.split("\n"), "metadata": metadata, "spec": self.spec}

@property
def core_v1(self):
"""
Expand All @@ -69,12 +82,14 @@ def core_v1(self):
self._core_v1 = core_v1_api.CoreV1Api()
return self._core_v1

def logging_containers(self, namespace=None, states=None, retry_seconds=5):
def logging_containers(
self, namespace=None, states=None, retry_seconds=5, pod_prefix=None
):
"""
Return list of containers intended to get logs from
"""
containers = []
pods = self.wait(namespace, states, retry_seconds)
pods = self.wait(namespace, states, retry_seconds, pod_prefix=pod_prefix)
container_name = getattr(self, "container_name", self.container)
print(f"Looking for container name {container_name}...")
for pod in pods.items:
Expand All @@ -90,17 +105,27 @@ def logging_containers(self, namespace=None, states=None, retry_seconds=5):
)
return containers

def wait(self, namespace=None, states=None, retry_seconds=5):
def get_pod_prefix(self, pod_prefix=None):
"""
Return the default or a custom pod prefix.
"""
pod_prefix = pod_prefix or getattr(self, "pod_prefix", None)
if not pod_prefix:
raise ValueError("A pod prefix 'pod_prefix' is required to wait for pods.")
return pod_prefix

def wait(self, namespace=None, states=None, retry_seconds=5, pod_prefix=None):
"""
Wait for one or more pods of interest to be done.
This assumes creation or a consistent size of pod getting to a
particular state. If looking for Termination -> gone, use
wait_for_delete.
"""
pod_prefix = self.get_pod_prefix(pod_prefix)
namespace = namespace or self.namespace
print(f"Looking for prefix {self.pod_prefix} in namespace {namespace}")
pod_list = self.get_pods(namespace, self.pod_prefix)
print(f"Looking for prefix {pod_prefix} in namespace {namespace}")
pod_list = self.get_pods(namespace, pod_prefix)
size = len(pod_list.items)

# We only want logs when they are completed
Expand All @@ -111,7 +136,7 @@ def wait(self, namespace=None, states=None, retry_seconds=5):
ready = set()
while len(ready) != size:
print(f"{len(ready)} pods are ready, out of {size}")
pod_list = self.get_pods(name=self.pod_prefix, namespace=namespace)
pod_list = self.get_pods(name=pod_prefix, namespace=namespace)

for pod in pod_list.items:
print(f"{pod.metadata.name} is in phase {pod.status.phase}")
Expand All @@ -126,16 +151,17 @@ def wait(self, namespace=None, states=None, retry_seconds=5):
print(f'All pods are in states "{states}"')
return pod_list

def wait_for_delete(self, namespace=None, retry_seconds=5):
def wait_for_delete(self, namespace=None, retry_seconds=5, pod_prefix=None):
"""
Wait for one or more pods of interest to be gone
"""
pod_prefix = self.get_pod_prefix(pod_prefix)
namespace = namespace or self.namespace
print(f"Looking for prefix {self.pod_prefix} in namespace {namespace}")
pod_list = self.get_pods(namespace, name=self.pod_prefix)
print(f"Looking for prefix {pod_prefix} in namespace {namespace}")
pod_list = self.get_pods(namespace, name=pod_prefix)
while len(pod_list.items) != 0:
print(f"{len(pod_list.items)} pods exist, waiting for termination.")
pod_list = self.get_pods(name=self.pod_prefix, namespace=namespace)
pod_list = self.get_pods(name=pod_prefix, namespace=namespace)
time.sleep(retry_seconds)
print("All pods are terminated.")

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/v1alpha1/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
if __name__ == "__main__":
setup(
name="metricsoperator",
version="0.0.18",
version="0.0.19",
author="Vanessasaurus",
author_email="vsoch@users.noreply.github.com",
maintainer="Vanessasaurus",
Expand Down

0 comments on commit 6368608

Please sign in to comment.