Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric importer refactor #1035

Merged
merged 20 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
09c4375
Added resolution AVG to the measurements table
ArneTR Dec 26, 2024
8ca1e73
(feature): Added modular importer for metrics instead of inline in ru…
ArneTR Dec 26, 2024
38352b2
Added more measurement test data
ArneTR Dec 26, 2024
c048ba1
(tests): For phase stats and metrics importer
ArneTR Dec 26, 2024
1f9ba39
Rework metric provider processing mechanism and removed storing to DB…
ArneTR Dec 27, 2024
e502173
Added tests
ArneTR Dec 27, 2024
27ca35b
Merge branch 'main' into metric-importer-refactor
ArneTR Dec 28, 2024
d045544
(improvement): Added all powermetrics metrics to phase_stats to remov…
ArneTR Dec 28, 2024
c83f2a3
(improvement): Close StringIO buffer to prevent leaks if long runtime
ArneTR Dec 28, 2024
93dd25a
(consistency):containers is always required for function
ArneTR Dec 28, 2024
3f4bfc4
(improvement): _check_empty introduced to have no magic assumptions a…
ArneTR Dec 28, 2024
43c7551
(style): Typos and indents
ArneTR Dec 28, 2024
4a4b09f
(fix): Tests where missing assert
ArneTR Dec 28, 2024
db14522
(improvement): Reading twice reduced to one in test helper
ArneTR Dec 28, 2024
8651deb
(Tests): Fix Tests
ArneTR Dec 28, 2024
e92dfce
Merge branch 'main' into metric-importer-refactor
ArneTR Dec 28, 2024
91e7310
(fix): Duplicate name in phase_stats without functionality
ArneTR Dec 28, 2024
01d8835
(style): Namings and typos
ArneTR Dec 28, 2024
37780b0
Changed powermetrics file to non time underflow
ArneTR Dec 28, 2024
3f0a51d
(fix): Removed email column from tests also
ArneTR Dec 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ CREATE TABLE measurements (
detail_name text NOT NULL,
metric text NOT NULL,
value bigint NOT NULL,
resolution_avg DOUBLE PRECISION NOT NULL,
resolution_max DOUBLE PRECISION NOT NULL,
resolution_95p DOUBLE PRECISION NOT NULL,
Comment on lines +121 to +123
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider adding indexes on resolution columns if they will be frequently queried for analysis or filtering

unit text NOT NULL,
time bigint NOT NULL,
created_at timestamp with time zone DEFAULT now(),
Expand Down
57 changes: 57 additions & 0 deletions lib/metric_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from io import StringIO

from lib.db import DB
from metric_providers.network.connections.tcpdump.system.provider import generate_stats_string

def import_measurements_new(df, metric_name, run_id):
ArneTR marked this conversation as resolved.
Show resolved Hide resolved

df['measurement_metric_id'] = None # prepare
detail_names = df[['detail_name', 'unit']].drop_duplicates()
ArneTR marked this conversation as resolved.
Show resolved Hide resolved

for _, row in detail_names.iterrows():
measurement_metric_id = DB().fetch_one('''
INSERT INTO measurement_metrics (run_id, metric, detail_name, unit)
VALUES (%s, %s, %s, %s)
RETURNING id
''', params=(run_id, metric_name, row['detail_name'], row['unit']))[0]
df.loc[(df['detail_name'] == row['detail_name']) & (df['unit'] == row['unit']), 'measurement_metric_id'] = measurement_metric_id
Comment on lines +11 to +17
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider batching the INSERT statements for better performance with many detail_names

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@greptileai: but how do I do that, when I need to return the ID for every one?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use a VALUES clause with multiple rows and still get the returned IDs using unnest. Here's how:

INSERT INTO measurement_metrics (run_id, metric, detail_name, unit)
VALUES %s
RETURNING id, detail_name, unit

Then construct the values tuple list in Python and use psycopg's execute_values(). This will return all IDs matched with their detail_name and unit, which you can then use to update the dataframe in one go.

Would you like me to show you the specific code changes needed to implement this?


f = StringIO(df[['measurement_metric_id', 'value', 'time']]
.to_csv(index=False, header=False))
DB().copy_from(file=f, table='measurement_values', columns=['measurement_metric_id', 'value', 'time'], sep=',')
Comment on lines +19 to +21
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider using a context manager (with statement) for the StringIO object to ensure proper cleanup

f.close()

def import_measurements(df, metric_name, run_id, containers=None):

if metric_name == 'network_connections_proxy_container_dockerproxy':

df['run_id'] = run_id
f = StringIO(df.to_csv(index=False, header=False))
DB().copy_from(file=f, table='network_intercepts', columns=df.columns, sep=',')
ArneTR marked this conversation as resolved.
Show resolved Hide resolved
f.close()

elif metric_name == 'network_connections_tcpdump_system':
DB().query("""
UPDATE runs
SET logs= COALESCE(logs, '') || %s -- append
WHERE id = %s
""", params=(generate_stats_string(df), run_id))

else:

if 'container_id' in df.columns:
df = map_container_id_to_detail_name(df, containers)
ArneTR marked this conversation as resolved.
Show resolved Hide resolved

df['run_id'] = run_id

f = StringIO(df.to_csv(index=False, header=False))
DB().copy_from(file=f, table='measurements', columns=df.columns, sep=',')
f.close()

def map_container_id_to_detail_name(df, containers):
df['detail_name'] = df.container_id
for container_id in containers:
df.loc[df.detail_name == container_id, 'detail_name'] = containers[container_id]['name']
df = df.drop('container_id', axis=1)

return df
25 changes: 11 additions & 14 deletions lib/phase_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from lib.global_config import GlobalConfig
from lib.db import DB
from lib import utils
from lib import error_helpers

def generate_csv_line(run_id, metric, detail_name, phase_name, value, value_type, max_value, min_value, unit):
Expand All @@ -23,7 +22,7 @@ def build_and_store_phase_stats(run_id, sci=None):
sci = {}

query = """
SELECT metric, unit, detail_name
SELECT metric, unit, detail_name, AVG(resolution_avg)
FROM measurements
WHERE run_id = %s
GROUP BY metric, unit, detail_name
Expand All @@ -37,17 +36,17 @@ def build_and_store_phase_stats(run_id, sci=None):


query = """
SELECT phases, measurement_config
SELECT phases
FROM runs
WHERE id = %s
"""
data = DB().fetch_one(query, (run_id, ))
phases = DB().fetch_one(query, (run_id, ))

if not data or not data[0] or not data[1]:
if not phases or not phases[0]:
error_helpers.log_error('Phases object was empty and no phase_stats could be created. This can happen for failed runs, but should be very rare ...', run_id=run_id)
return

phases, measurement_config = data # unpack
phases = phases[0]

csv_buffer = StringIO()

Expand Down Expand Up @@ -75,7 +74,7 @@ def build_and_store_phase_stats(run_id, sci=None):
csv_buffer.write(generate_csv_line(run_id, 'phase_time_syscall_system', '[SYSTEM]', f"{idx:03}_{phase['name']}", duration, 'TOTAL', None, None, 'us'))

# now we go through all metrics in the run and aggregate them
for (metric, unit, detail_name) in metrics: # unpack
for metric, unit, detail_name, resolution_avg in metrics: # unpack
# -- saved for future if I need lag time query
# WITH times as (
# SELECT id, value, time, (time - LAG(time) OVER (ORDER BY detail_name ASC, time ASC)) AS diff, unit
Expand All @@ -84,9 +83,6 @@ def build_and_store_phase_stats(run_id, sci=None):
# ORDER BY detail_name ASC, time ASC
# ) -- Backlog: if we need derivatives / integrations in the future

provider_name = metric.replace('_', '.') + '.provider.' + utils.get_pascal_case(metric) + 'Provider'
provider_resolution_in_ms = measurement_config['providers'][provider_name]['resolution']

results = DB().fetch_one(select_query,
(run_id, metric, detail_name, phase['start'], phase['end'], ))

Expand Down Expand Up @@ -121,10 +117,10 @@ def build_and_store_phase_stats(run_id, sci=None):
if metric in ('cpu_utilization_cgroup_container', ):
cpu_utilization_containers[detail_name] = avg_value

elif metric in ['network_io_cgroup_container', 'network_io_procfs_system', 'disk_io_procfs_system', 'disk_io_cgroup_container']:
elif metric in ['network_io_cgroup_container', 'network_io_procfs_system', 'disk_io_procfs_system', 'disk_io_cgroup_container', 'disk_io_bytesread_powermetrics_vm', 'disk_io_byteswritten_powermetrics_vm']:
# I/O values should be per second. However we have very different timing intervals.
# So we do not directly use the average here, as this would be the average per sampling frequency. We go through the duration
provider_conversion_factor_to_s = decimal.Decimal(provider_resolution_in_ms/1_000)
provider_conversion_factor_to_s = decimal.Decimal(resolution_avg/1_000_000)
csv_buffer.write(generate_csv_line(run_id, metric, detail_name, f"{idx:03}_{phase['name']}", avg_value/provider_conversion_factor_to_s, 'MEAN', max_value/provider_conversion_factor_to_s, min_value/provider_conversion_factor_to_s, f"{unit}/s"))

# we also generate a total line to see how much total data was processed
Expand Down Expand Up @@ -152,8 +148,9 @@ def build_and_store_phase_stats(run_id, sci=None):
machine_energy_phase = value_sum
machine_power_phase = power_avg

else:
error_helpers.log_error('Unmapped phase_stat found, using default', metric=metric, detail_name=detail_name, run_id=run_id)
else: # Default
if metric not in ('cpu_time_powermetrics_vm', ):
error_helpers.log_error('Unmapped phase_stat found, using default', metric=metric, detail_name=detail_name, run_id=run_id)
csv_buffer.write(generate_csv_line(run_id, metric, detail_name, f"{idx:03}_{phase['name']}", value_sum, 'TOTAL', max_value, min_value, unit))

# after going through detail metrics, create cumulated ones
Expand Down
59 changes: 51 additions & 8 deletions metric_providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import subprocess
from io import StringIO
import pandas
from typing import final

from lib.system_checks import ConfigurationCheckError
from lib import process_helpers
Expand Down Expand Up @@ -97,18 +98,16 @@ def get_stderr(self):
def has_started(self):
return self._has_started

def check_monotonic(self, df):
def _check_monotonic(self, df):
if not df['time'].is_monotonic_increasing:
raise ValueError(f"Time from metric provider {self._metric_name} is not monotonic increasing")

def check_resolution_underflow(self, df):
def _check_resolution_underflow(self, df):
if self._unit in ['mJ', 'uJ', 'Hz', 'us']:
if (df['value'] <= 1).any():
raise ValueError(f"Data from metric provider {self._metric_name} is running into a resolution underflow. Values are <= 1 {self._unit}")



def read_metrics(self, run_id, containers=None): #pylint: disable=unused-argument
def _read_metrics(self): # can be overriden in child
with open(self._filename, 'r', encoding='utf-8') as file:
csv_data = file.read()

Expand All @@ -125,13 +124,57 @@ def read_metrics(self, run_id, containers=None): #pylint: disable=unused-argumen
if df.isna().any().any():
raise ValueError(f"Dataframe for {self._metric_name} contained NA values.")

return df

def _check_empty(self, df):
if df.empty:
raise RuntimeError(f"Metrics provider {self._metric_name} metrics log file was empty.")


def _parse_metrics(self, df): # can be overriden in child
df['detail_name'] = f"[{self._metric_name.split('_')[-1]}]" # default, can be overridden in child
return df

def _add_and_validate_resolution_and_jitter(self, df):
# DF can have many columns still. Since all of them might have induced a separate timing row
# we group by everything apart from time and value itself
# for most metric providers only detail_name and container_id should be present and differ though
excluded_columns = ['time', 'value']
grouping_columms = [col for col in df.columns if col not in excluded_columns]
df['effective_resolution'] = df.groupby(grouping_columms)['time'].diff()
df['resolution_max'] = df.groupby(grouping_columms)['effective_resolution'].transform('max')
df['resolution_avg'] = df.groupby(grouping_columms)['effective_resolution'].transform('mean')
df['resolution_95p'] = df.groupby(grouping_columms)['effective_resolution'].transform(lambda x: x.quantile(0.95))
df = df.drop('effective_resolution', axis=1)

if (resolution_95p := df['resolution_95p'].max()) >= self._resolution*1000*1.2:
raise RuntimeError(f"Resolution 95p was absurdly high: {resolution_95p} compared to base resolution of {self._resolution*1000}", df)
ArneTR marked this conversation as resolved.
Show resolved Hide resolved

if (resolution_95p := df['resolution_95p'].min()) <= self._resolution*1000*0.8:
raise RuntimeError(f"Resolution 95p was absurdly low: {resolution_95p} compared to base resolution of {self._resolution*1000}", df)


return df

def _add_unit_and_metric(self, df): # can be overriden in child
df['unit'] = self._unit
df['metric'] = self._metric_name
df['run_id'] = run_id
return df

@final
def read_metrics(self): # should not be overriden

df = self._read_metrics() # is not always returning a data frame, but can in rare cases also return a list if no actual numeric measurements are captured
ArneTR marked this conversation as resolved.
Show resolved Hide resolved

self._check_empty(df)

self._check_monotonic(df) # check must be made before data frame is potentially sorted in _parse_metrics
self._check_resolution_underflow(df)

df = self._parse_metrics(df)
df = self._add_unit_and_metric(df)

self.check_monotonic(df)
self.check_resolution_underflow(df)
df = self._add_and_validate_resolution_and_jitter(df)

return df

Expand Down
6 changes: 1 addition & 5 deletions metric_providers/cpu/energy/rapl/msr/component/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ def check_system(self, check_command="default", check_error_message=None, check_
if not is_rapl_energy_filtering_deactivated():
raise MetricProviderConfigurationError('RAPL energy filtering is active and might skew results!')

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):

df['detail_name'] = df.package_id
df = df.drop('package_id', axis=1)
Expand Down
6 changes: 1 addition & 5 deletions metric_providers/cpu/frequency/sysfs/core/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ def __init__(self, resolution, skip_check=False):
skip_check=skip_check,
)

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):

df['detail_name'] = df.core_id
df = df.drop('core_id', axis=1)
Expand Down
13 changes: 0 additions & 13 deletions metric_providers/cpu/time/cgroup/container/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,3 @@ def __init__(self, resolution, skip_check=False):
current_dir=os.path.dirname(os.path.abspath(__file__)),
skip_check=skip_check,
)

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df

df['detail_name'] = df.container_id
for container_id in containers:
df.loc[df.detail_name == container_id, 'detail_name'] = containers[container_id]['name']
df = df.drop('container_id', axis=1)

return df
15 changes: 1 addition & 14 deletions metric_providers/cpu/utilization/cgroup/container/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,5 @@ def __init__(self, resolution, skip_check=False):
resolution=resolution,
unit='Ratio',
current_dir=os.path.dirname(os.path.abspath(__file__)),
skip_check = skip_check,
skip_check=skip_check,
)

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df

df['detail_name'] = df.container_id
for container_id in containers:
df.loc[df.detail_name == container_id, 'detail_name'] = containers[container_id]['name']
df = df.drop('container_id', axis=1)

return df
12 changes: 2 additions & 10 deletions metric_providers/disk/io/cgroup/container/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ def __init__(self, resolution, skip_check=False):
skip_check=skip_check,
)

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):
df = super()._parse_metrics(df) # sets detail_name

df = df.sort_values(by=['container_id', 'time'], ascending=True)

Expand All @@ -42,9 +39,4 @@ def read_metrics(self, run_id, containers=None):
df['value'] = df.value.astype(int)
df = df.drop(columns=['read_bytes','written_bytes', 'written_bytes_intervals', 'read_bytes_intervals']) # clean up

df['detail_name'] = df.container_id
for container_id in containers:
df.loc[df.detail_name == container_id, 'detail_name'] = containers[container_id]['name']
df = df.drop('container_id', axis=1)

return df
8 changes: 2 additions & 6 deletions metric_providers/disk/io/procfs/system/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ def __init__(self, resolution, skip_check=False):
skip_check=skip_check,
)

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):
df['detail_name'] = df['device']

df = df.sort_values(by=['device', 'time'], ascending=True)

Expand All @@ -42,7 +39,6 @@ def read_metrics(self, run_id, containers=None):
df['blocksize'] = df['device'].apply(self.get_blocksize)
df['value'] = (df['read_sectors_intervals'] + df['written_sectors_intervals'])*df['blocksize']
df['value'] = df.value.astype(int)
df['detail_name'] = df['device']
df = df.drop(columns=['read_sectors','written_sectors', 'written_sectors_intervals', 'read_sectors_intervals', 'device', 'blocksize']) # clean up

return df
Expand Down
7 changes: 2 additions & 5 deletions metric_providers/gpu/energy/nvidia/smi/component/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ def __init__(self, resolution, skip_check=False):
def check_system(self, check_command="default", check_error_message=None, check_parallel_provider=True):
super().check_system(check_command=['which', 'nvidia-smi'], check_error_message="nvidia-smi is not installed on the system")

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):
df = super()._parse_metrics(df) # sets detail_name

'''
Conversion to Joules
Expand Down
3 changes: 1 addition & 2 deletions metric_providers/lmsensors/abstract_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ def check_system(self, check_command="default", check_error_message=None, check_
raise MetricProviderConfigurationError(f"{self._metric_name} provider could not be started.\nCannot find feature '{feature}' in the output section for chip starting with '{config_chip}' of the 'sensors' command.\n\nAre you running in a VM / cloud / shared hosting?\nIf so please disable the {self._metric_name} provider in the config.yml")


def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)
def _parse_metrics(self, df):

df['detail_name'] = df.sensor_name
df = df.drop('sensor_name', axis=1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ def check_system(self, check_command="default", check_error_message=None, check_
if not is_rapl_energy_filtering_deactivated():
raise MetricProviderConfigurationError('RAPL energy filtering is active and might skew results!')

def read_metrics(self, run_id, containers=None):
df = super().read_metrics(run_id, containers)

if df.empty:
return df
def _parse_metrics(self, df):
ArneTR marked this conversation as resolved.
Show resolved Hide resolved

df['detail_name'] = df.dram_id
df = df.drop('dram_id', axis=1)
ArneTR marked this conversation as resolved.
Show resolved Hide resolved
ArneTR marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Loading
Loading