Skip to content

Commit

Permalink
Resource Allocation data in Redis (#3430)
Browse files Browse the repository at this point in the history
* Update to DB qiita.slurm_resource_allocations

* qiita-cron-job initialize-resource-allocations-redis

* Populate redis with resource-allocation data

* Removed changes to qiita_pet

This pull request should only contain changes with uploading data to redis. I accidentally commited some changes to qiita_pet here.

* Minor changes

* Updates to Antonio’s coments

* Update meta_util.py
  • Loading branch information
Gossty authored Sep 2, 2024
1 parent e34df41 commit 6f1a3d4
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 19 deletions.
107 changes: 106 additions & 1 deletion qiita_db/meta_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@
from re import sub
from json import loads, dump, dumps

from qiita_db.util import create_nested_path
from qiita_db.util import create_nested_path, retrieve_resource_data
from qiita_db.util import resource_allocation_plot
from qiita_core.qiita_settings import qiita_config, r_client
from qiita_core.configuration_manager import ConfigurationManager
import qiita_db as qdb

# global constant list used in resource_allocation_page
COLUMNS = [
"sName", "sVersion", "cID", "cName", "processing_job_id",
"parameters", "samples", "columns", "input_size", "extra_info",
"MaxRSSRaw", "ElapsedRaw", "Start", "node_name", "node_model"]


def _get_data_fpids(constructor, object_id):
"""Small function for getting filepath IDS associated with data object
Expand Down Expand Up @@ -546,3 +553,101 @@ def generate_plugin_releases():
# important to "flush" variables to avoid errors
r_client.delete(redis_key)
f(redis_key, v)


def get_software_commands(active):
software_list = [s for s in qdb.software.Software.iter(active=active)]
software_commands = defaultdict(lambda: defaultdict(list))

for software in software_list:
sname = software.name
sversion = software.version
commands = software.commands

for command in commands:
software_commands[sname][sversion].append(command.name)

return dict(software_commands)


def update_resource_allocation_redis(active=True):
"""Updates redis with plots and information about current software.
Parameters
----------
active: boolean, optional
Defaults to True. Should only be False when testing.
"""
time = datetime.now().strftime('%m-%d-%y')
scommands = get_software_commands(active)
redis_key = 'resources:commands'
r_client.set(redis_key, str(scommands))

for sname, versions in scommands.items():
for version, commands in versions.items():
for cname in commands:

col_name = "samples * columns"
df = retrieve_resource_data(cname, sname, version, COLUMNS)
if len(df) == 0:
continue

fig, axs = resource_allocation_plot(df, cname, sname, col_name)
titles = [0, 0]
images = [0, 0]

# Splitting 1 image plot into 2 separate for better layout.
for i, ax in enumerate(axs):
titles[i] = ax.get_title()
ax.set_title("")
# new_fig, new_ax – copy with either only memory plot or
# only time
new_fig = plt.figure()
new_ax = new_fig.add_subplot(111)

scatter_data = ax.collections[0]
new_ax.scatter(scatter_data.get_offsets()[:, 0],
scatter_data.get_offsets()[:, 1],
s=scatter_data.get_sizes(), label="data")

line = ax.lines[0]
new_ax.plot(line.get_xdata(), line.get_ydata(),
linewidth=1, color='orange')

if len(ax.collections) > 1:
failure_data = ax.collections[1]
new_ax.scatter(failure_data.get_offsets()[:, 0],
failure_data.get_offsets()[:, 1],
color='red', s=3, label="failures")

new_ax.set_xscale('log')
new_ax.set_yscale('log')
new_ax.set_xlabel(ax.get_xlabel())
new_ax.set_ylabel(ax.get_ylabel())
new_ax.legend(loc='upper left')

new_fig.tight_layout()
plot = BytesIO()
new_fig.savefig(plot, format='png')
plot.seek(0)
img = 'data:image/png;base64,' + quote(
b64encode(plot.getvalue()).decode('ascii'))
images[i] = img
plt.close(new_fig)
plt.close(fig)

# SID, CID, col_name
values = [
("img_mem", images[0], r_client.set),
("img_time", images[1], r_client.set),
('time', time, r_client.set),
("title_mem", titles[0], r_client.set),
("title_time", titles[1], r_client.set)
]

for k, v, f in values:
redis_key = 'resources$#%s$#%s$#%s$#%s:%s' % (
cname, sname, version, col_name, k)
r_client.delete(redis_key)
f(redis_key, v)
21 changes: 21 additions & 0 deletions qiita_db/test/test_meta_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,27 @@ def test_generate_plugin_releases(self):
'-', '').replace(':', '').replace(' ', '-')
self.assertEqual(tgz_obs, [time])

def test_update_resource_allocation_redis(self):
cname = "Split libraries FASTQ"
sname = "QIIMEq2"
col_name = "samples * columns"
version = "1.9.1"
qdb.meta_util.update_resource_allocation_redis(False)
title_mem_str = 'resources$#%s$#%s$#%s$#%s:%s' % (
cname, sname, version, col_name, 'title_mem')
title_mem = str(r_client.get(title_mem_str))
self.assertTrue(
"model: "
"k * log(x) + "
"b * log(x)^2 + "
"a * log(x)^3" in title_mem
)

title_time_str = 'resources$#%s$#%s$#%s$#%s:%s' % (
cname, sname, version, col_name, 'title_time')
title_time = str(r_client.get(title_time_str))
self.assertTrue("model: a + b + log(x) * k" in title_time)


if __name__ == '__main__':
main()
21 changes: 11 additions & 10 deletions qiita_db/test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1311,8 +1311,9 @@ def test_quick_mounts_purge(self):

class ResourceAllocationPlotTests(TestCase):
def setUp(self):
self.CNAME = "Split libraries FASTQ"
self.SNAME = "QIIMEq2"
self.cname = "Split libraries FASTQ"
self.sname = "QIIMEq2"
self.version = "1.9.1"
self.col_name = 'samples * columns'
self.columns = [
"sName", "sVersion", "cID", "cName", "processing_job_id",
Expand All @@ -1321,13 +1322,13 @@ def setUp(self):

# df is a dataframe that represents a table with columns specified in
# self.columns
self.df = qdb.util._retrieve_resource_data(
self.CNAME, self.SNAME, self.columns)
self.df = qdb.util.retrieve_resource_data(
self.cname, self.sname, self.version, self.columns)

def test_plot_return(self):
# check the plot returns correct objects
fig1, axs1 = qdb.util.resource_allocation_plot(
self.df, self.CNAME, self.SNAME, self.col_name)
self.df, self.cname, self.sname, self.col_name)
self.assertIsInstance(
fig1, Figure,
"Returned object fig1 is not a Matplotlib Figure")
Expand All @@ -1338,13 +1339,13 @@ def test_plot_return(self):

def test_minimize_const(self):
self.df = self.df[
(self.df.cName == self.CNAME) & (self.df.sName == self.SNAME)]
(self.df.cName == self.cname) & (self.df.sName == self.sname)]
self.df.dropna(subset=['samples', 'columns'], inplace=True)
self.df[self.col_name] = self.df.samples * self.df['columns']
fig, axs = plt.subplots(ncols=2, figsize=(10, 4), sharey=False)

bm, options = qdb.util._resource_allocation_plot_helper(
self.df, axs[0], self.CNAME, self.SNAME, 'MaxRSSRaw',
self.df, axs[0], self.cname, self.sname, 'MaxRSSRaw',
qdb.util.MODELS_MEM, self.col_name)
# check that the algorithm chooses correct model for MaxRSSRaw and
# has 0 failures
Expand All @@ -1366,7 +1367,7 @@ def test_minimize_const(self):
# check that the algorithm chooses correct model for ElapsedRaw and
# has 1 failure
bm, options = qdb.util._resource_allocation_plot_helper(
self.df, axs[1], self.CNAME, self.SNAME, 'ElapsedRaw',
self.df, axs[1], self.cname, self.sname, 'ElapsedRaw',
qdb.util.MODELS_TIME, self.col_name)
k, a, b = options.x
failures_df = qdb.util._resource_allocation_failures(
Expand Down Expand Up @@ -1422,8 +1423,8 @@ def test_db_update(self):
qdb.util.update_resource_allocation_table(test=test_data)

for curr_cname, ids in types.items():
updated_df = qdb.util._retrieve_resource_data(
curr_cname, self.SNAME, self.columns)
updated_df = qdb.util.retrieve_resource_data(
curr_cname, self.sname, self.version, self.columns)
updated_ids_set = set(updated_df['processing_job_id'])
previous_ids_set = set(self.df['processing_job_id'])
for id in ids:
Expand Down
44 changes: 36 additions & 8 deletions qiita_db/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,27 @@
MODELS_TIME = [time_model1, time_model2, time_model3, time_model4]


def get_model_name(model):
if model == mem_model1:
return "k * log(x) + x * a + b"
elif model == mem_model2:
return "k * log(x) + b * log(x)^2 + a"
elif model == mem_model3:
return "k * log(x) + b * log(x)^2 + a * log(x)^3"
elif model == mem_model4:
return "k * log(x) + b * log(x)^2 + a * log(x)^2.5"
elif model == time_model1:
return "a + b + log(x) * k"
elif model == time_model2:
return "a + b * x + log(x) * k"
elif model == time_model3:
return "a + b * log(x)^2 + log(x) * k"
elif model == time_model4:
return "a * log(x)^3 + b * log(x)^2 + log(x) * k"
else:
return "Unknown model"


def scrub_data(s):
r"""Scrubs data fields of characters not allowed by PostgreSQL
Expand Down Expand Up @@ -2381,7 +2402,7 @@ def resource_allocation_plot(df, cname, sname, col_name):
return fig, axs


def _retrieve_resource_data(cname, sname, columns):
def retrieve_resource_data(cname, sname, version, columns):
with qdb.sql_connection.TRN:
sql = """
SELECT
Expand Down Expand Up @@ -2411,9 +2432,10 @@ def _retrieve_resource_data(cname, sname, columns):
ON pr.processing_job_id = sra.processing_job_id
WHERE
sc.name = %s
AND s.name = %s;
AND s.name = %s
AND s.version = %s
"""
qdb.sql_connection.TRN.add(sql, sql_args=[cname, sname])
qdb.sql_connection.TRN.add(sql, sql_args=[cname, sname, version])
res = qdb.sql_connection.TRN.execute_fetchindex()
df = pd.DataFrame(res, columns=columns)
return df
Expand Down Expand Up @@ -2482,15 +2504,18 @@ def _resource_allocation_plot_helper(
y_plot = best_model(x_plot, k, a, b)
ax.plot(x_plot, y_plot, linewidth=1, color='orange')

cmin_value = min(y_plot)
cmax_value = max(y_plot)

maxi = naturalsize(df[curr].max(), gnu=True) if curr == "MaxRSSRaw" else \
timedelta(seconds=float(df[curr].max()))
cmax = naturalsize(max(y_plot), gnu=True) if curr == "MaxRSSRaw" else \
timedelta(seconds=float(max(y_plot)))
cmax = naturalsize(cmax_value, gnu=True) if curr == "MaxRSSRaw" else \
str(timedelta(seconds=round(cmax_value, 2))).rstrip('0').rstrip('.')

mini = naturalsize(df[curr].min(), gnu=True) if curr == "MaxRSSRaw" else \
timedelta(seconds=float(df[curr].min()))
cmin = naturalsize(min(y_plot), gnu=True) if curr == "MaxRSSRaw" else \
timedelta(seconds=float(min(y_plot)))
cmin = naturalsize(cmin_value, gnu=True) if curr == "MaxRSSRaw" else \
str(timedelta(seconds=round(cmin_value, 2))).rstrip('0').rstrip('.')

x_plot = np.array(df[col_name])
failures_df = _resource_allocation_failures(
Expand All @@ -2500,7 +2525,10 @@ def _resource_allocation_plot_helper(
ax.scatter(failures_df[col_name], failures_df[curr], color='red', s=3,
label="failures")

ax.set_title(f'{cname}: {sname}\n real: {mini} || {maxi}\n'
ax.set_title(
f'k||a||b: {k}||{a}||{b}\n'
f'model: {get_model_name(best_model)}\n'
f'real: {mini} || {maxi}\n'
f'calculated: {cmin} || {cmax}\n'
f'failures: {failures}')
ax.legend(loc='upper left')
Expand Down
1 change: 1 addition & 0 deletions scripts/all-qiita-cron-job
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ qiita-cron-job empty-trash-upload-folder
qiita-cron-job generate-biom-and-metadata-release
qiita-cron-job purge-filepaths
qiita-cron-job update-redis-stats
qiita-cron-job update-resource-allocation-redis
qiita-cron-job generate-plugin-releases
qiita-cron-job purge-json-web-tokens
6 changes: 6 additions & 0 deletions scripts/qiita-cron-job
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ from qiita_db.util import (
quick_mounts_purge as qiita_quick_mounts_purge)
from qiita_db.meta_util import (
update_redis_stats as qiita_update_redis_stats,
update_resource_allocation_redis as qiita_update_resource_allocation_redis,
generate_biom_and_metadata_release as
qiita_generate_biom_and_metadata_release,
generate_plugin_releases as qiita_generate_plugin_releases)
Expand Down Expand Up @@ -48,6 +49,11 @@ def update_redis_stats():
qiita_update_redis_stats()


@commands.command()
def update_resource_allocation_redis():
qiita_update_resource_allocation_redis()


@commands.command()
def generate_biom_and_metadata_release():
qiita_generate_biom_and_metadata_release('public')
Expand Down

0 comments on commit 6f1a3d4

Please sign in to comment.