Skip to content

Commit

Permalink
main loop: log memory
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Feb 5, 2020
1 parent 43daa4d commit 2ba27c7
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 0 deletions.
125 changes: 125 additions & 0 deletions cylc/flow/main_loop/log_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import json
import math
from pathlib import Path
from time import time

try:
import matplotlib
matplotlib.use('Agg')
from matplotlib import pyplot as plt
PLT = True
except ModuleNotFoundError:
PLT = False

from pympler.asizeof import asized


# TODO: make this configurable in the global config
MIN_SIZE = 10000


async def before(scheduler, state):
"""Take an initial memory snapshot."""
state['data'] = []
await during(scheduler, state)


async def during(scheduler, state):
"""Take a memory snapshot"""
state['data'].append((
time(),
_compute_sizes(scheduler, min_size=MIN_SIZE)
))


async def after(scheduler, state):
"""Take a final memory snapshot."""
await during(scheduler, state)
_dump(state['data'], scheduler.suite_run_dir)
fields, times = _transpose(state['data'])
_plot(
fields,
times,
scheduler.suite_run_dir,
f'cylc.flow.scheduler.Scheduler attrs > {MIN_SIZE / 1000}kb'
)


def _compute_sizes(obj, min_size=10000):
"""Return the sizes of the attributes of an object."""
size = asized(obj, detail=2)
for ref in size.refs:
if ref.name == '__dict__':
break
else:
raise Exception('Cannot find __dict__ reference')

return {
item.name.split(':')[0][4:]: item.size
for item in ref.refs
if item.size > min_size
}


def _transpose(data):
"""Pivot data from snapshot to series oriented."""
all_keys = set()
for _, datum in data:
all_keys.update(datum.keys())

# sort keys by the size of the last checkpoint so that the fields
# get plotted from largest to smallest
all_keys = list(all_keys)
all_keys.sort(key=lambda x: data[-1][1].get(x, 0), reverse=True)

# extract data for each field, if not present
fields = {}
for key in all_keys:
fields[key] = [
datum.get(key, -1)
for _, datum in data
]

start_time = data[0][0]
times = [
timestamp - start_time
for timestamp, _ in data
]

return fields, times


def _dump(data, path):
json.dump(
data,
Path(path, 'log_memory.json').open('w+')
)
return True


def _plot(fields, times, path, title='Objects'):
if (
not PLT
or len(times) < 2
):
return False

fig, ax1 = plt.subplots(figsize=(10, 7.5))

fig.suptitle(title)
ax1.set_xlabel('Time (s)')
ax1.set_ylabel('Memory (kb)')

for key, sizes in fields.items():
ax1.plot(times, [x / 1000 for x in sizes], label=key)

ax1.legend(loc=0)

# start both axis at 0
ax1.set_xlim(0, ax1.get_xlim()[1])
ax1.set_ylim(0, ax1.get_ylim()[1])

plt.savefig(
Path(path, 'log_memory.pdf')
)
return True
71 changes: 71 additions & 0 deletions cylc/flow/tests/main_loop/log_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from pathlib import Path
from unittest.mock import Mock

import pytest

from cylc.flow.main_loop.log_memory import (
_compute_sizes,
_transpose,
_dump,
_plot
)


def test_compute_sizes():
"""Test the interface for the calculation of instance attribute sizes."""
keys = {
'a': [],
'b': 42,
'c': 'beef wellington'
}
test_object = Mock(**keys)
# no fields should be larger than 10kb
assert _compute_sizes(test_object, 10000) == {}
# all fields should be larger than 0kb
ret = _compute_sizes(test_object, 0)
assert {
key
for key, value in ret.items()
# filter out mock fields
if not key.startswith('_')
and key != 'method_calls'
} == set(keys)


@pytest.fixture()
def test_data():
return [
(5, {'a': 1, 'b': 2, 'c': 3}),
(6, {'a': 2, 'c': 4}),
(7, {'a': 5, 'c': 2})
]


def test_transpose(test_data):
"""Test transposing the data from bin to series orientated."""
assert _transpose(test_data) == (
{
# the keys are sorted by their last entry
'a': [1, 2, 5],
'c': [3, 4, 2],
'b': [2, -1, -1] # missing values become -1
},
[0, 1, 2]
)


def test_dump(test_data, tmp_path):
"""Ensure the data is serialiseable."""
_dump(test_data, tmp_path)
assert list(tmp_path.iterdir()) == [
Path(tmp_path, 'log_memory.json')
]


def test_plot(test_data, tmp_path):
"""Ensure the plotting mechanism doesn't raise errors."""
fields, times = _transpose(test_data)
_plot(fields, times, tmp_path)
assert list(tmp_path.iterdir()) == [
Path(tmp_path, 'log_memory.pdf')
]
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,4 @@ main_loop =
health_check = cylc.flow.main_loop.health_check
auto_restart = cylc.flow.main_loop.auto_restart
log_data_store = cylc.flow.main_loop.log_data_store
log_memory = cylc.flow.main_loop.log_memory

0 comments on commit 2ba27c7

Please sign in to comment.