Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Multiple values for 'n_jobs' or 'threads' #18

Merged
merged 20 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ jobs:
- uses: qiime2/action-library-packaging@alpha1
with:
plugin-name: q2-diversity-lib
additional-tests: pytest --pyargs q2_diversity_lib
additional-tests: pytest --pyargs q2_diversity_lib --capture=tee-sys
47 changes: 21 additions & 26 deletions q2_diversity_lib/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def _drop_undefined_samples(counts: np.ndarray, sample_ids: np.ndarray,

@decorator
def _disallow_empty_tables(wrapped_function, *args, **kwargs):
bound_signature = signature(wrapped_function).bind(*args, **kwargs)
table = bound_signature.arguments.get('table')
bound_arguments = signature(wrapped_function).bind(*args, **kwargs)
table = bound_arguments.arguments.get('table')
if table is None:
raise TypeError("The wrapped function has no parameter 'table'")

Expand All @@ -55,24 +55,19 @@ def _disallow_empty_tables(wrapped_function, *args, **kwargs):

@decorator
def _validate_requested_cpus(wrapped_function, *args, **kwargs):
bound_signature = signature(wrapped_function).bind(*args, **kwargs)
bound_signature.apply_defaults()
bound_arguments = signature(wrapped_function).bind(*args, **kwargs)
bound_arguments.apply_defaults()
b_a_arguments = bound_arguments.arguments

# Handle duplicate param names
if all(params in bound_signature.arguments
for params in ['n_jobs', 'threads']):
if 'n_jobs' in b_a_arguments and 'threads' in b_a_arguments:
raise TypeError("Duplicate parameters: The _validate_requested_cpus "
"decorator may not be applied to callables with both "
"'n_jobs' and 'threads' parameters. Do you really need"
" both?")

# Handle cpu requests coming from different parameter names
if 'n_jobs' in bound_signature.arguments:
elif 'n_jobs' in b_a_arguments:
param_name = 'n_jobs'
cpus_requested = bound_signature.arguments[param_name]
elif 'threads' in bound_signature.arguments:
elif 'threads' in b_a_arguments:
param_name = 'threads'
cpus_requested = bound_signature.arguments[param_name]
else:
raise TypeError("The _validate_requested_cpus decorator may not be"
" applied to callables without an 'n_jobs' or "
Expand All @@ -81,21 +76,21 @@ def _validate_requested_cpus(wrapped_function, *args, **kwargs):
# If `Process.cpu_affinity` unavailable on system, fall back
# https://psutil.readthedocs.io/en/latest/index.html#psutil.cpu_count
try:
cpus = len(psutil.Process().cpu_affinity())
cpus_available = len(psutil.Process().cpu_affinity())
except AttributeError:
cpus = psutil.cpu_count(logical=False)
cpus_available = psutil.cpu_count(logical=False)

if isinstance(cpus_requested, int) and cpus_requested > cpus:
raise ValueError(f"The value passed to '{param_name}' cannot exceed "
f"the number of processors ({cpus}) available to "
"the system.")
cpus_requested = b_a_arguments[param_name]

if cpus_requested == 'auto':
ChrisKeefe marked this conversation as resolved.
Show resolved Hide resolved
# remove 'auto' from args to prevent 'multiple values' TypeError...
argslist = list(args)
argslist.remove('auto')
return_args = tuple(argslist)
# ...then inject number of available cpus
return wrapped_function(*return_args, **kwargs, **{param_name: cpus})
# mutate bound_arguments.arguments 'auto' to the requested # of cpus...
b_a_arguments[param_name] = cpus_available
# ...and update cpus requested to prevent TypeError
cpus_requested = cpus_available

return wrapped_function(*args, **kwargs)
if cpus_requested > cpus_available:
raise ValueError(f"The value passed to '{param_name}' cannot exceed "
f"the number of processors ({cpus_available}) "
"available to the system.")

return wrapped_function(*bound_arguments.args, **bound_arguments.kwargs)
18 changes: 9 additions & 9 deletions q2_diversity_lib/tests/test_beta.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------

import numpy as np
import numpy.testing as npt
import biom
Expand Down Expand Up @@ -238,9 +237,16 @@ def setUp(self):
p_a_table_fp = self.get_data_path('two_feature_p_a_table.biom')
self.p_a_table_as_BIOMV210Format = BIOMV210Format(p_a_table_fp,
mode='r')
self.table_as_artifact = Artifact.import_data(
'FeatureTable[Frequency]', self.table_as_BIOMV210Format)

tree_fp = self.get_data_path('three_feature.tree')
self.tree_as_NewickFormat = NewickFormat(tree_fp, mode='r')
self.tree_as_artifact = Artifact.import_data(
'Phylogeny[Rooted]', self.tree_as_NewickFormat)

self.unweighted_unifrac_thru_framework = self.plugin.actions[
'unweighted_unifrac']

def test_method(self):
actual = unweighted_unifrac(self.table_as_BIOMV210Format,
Expand All @@ -266,14 +272,8 @@ def test_accepted_types_have_consistent_behavior(self):
self.expected[id1, id2])

def test_does_it_run_through_framework(self):
unweighted_unifrac_thru_framework = self.plugin.actions[
'unweighted_unifrac']
table_as_artifact = Artifact.import_data(
'FeatureTable[Frequency]', self.table_as_BIOMV210Format)
tree_as_artifact = Artifact.import_data(
'Phylogeny[Rooted]', self.tree_as_NewickFormat)
unweighted_unifrac_thru_framework(table_as_artifact,
tree_as_artifact)
self.unweighted_unifrac_thru_framework(self.table_as_artifact,
self.tree_as_artifact)
# If we get here, then it ran without error
self.assertTrue(True)

Expand Down
65 changes: 58 additions & 7 deletions q2_diversity_lib/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------

import unittest.mock as mock
from unittest.mock import MagicMock
from unittest import mock

import numpy as np
import biom
import psutil

from qiime2 import Artifact
from qiime2.plugin.testing import TestPluginBase
from q2_types.feature_table import BIOMV210Format
from q2_types.tree import NewickFormat
Expand Down Expand Up @@ -96,13 +96,29 @@ def function_w_duplicate_params(n_jobs=3, threads=2):
pass
self.function_w_both = function_w_duplicate_params

valid_table_fp = self.get_data_path('two_feature_table.biom')
self.valid_table_as_BIOMV210Format = BIOMV210Format(valid_table_fp,
mode='r')
self.valid_table = biom.load_table(valid_table_fp)
self.jaccard_thru_framework = self.plugin.actions['jaccard']
self.unweighted_unifrac_thru_framework = self.plugin.actions[
'unweighted_unifrac']

two_feature_table_fp = self.get_data_path('two_feature_table.biom')
self.two_feature_table = biom.load_table(two_feature_table_fp)
self.two_feature_table_as_BIOMV210Format = BIOMV210Format(
two_feature_table_fp, mode='r')
self.two_feature_table_as_artifact = Artifact.import_data(
'FeatureTable[Frequency]', two_feature_table_fp)

larger_table_fp = self.get_data_path('crawford.biom')
self.larger_table_as_artifact = Artifact.import_data(
'FeatureTable[Frequency]', larger_table_fp)

valid_tree_fp = self.get_data_path('three_feature.tree')
self.valid_tree_as_NewickFormat = NewickFormat(valid_tree_fp, mode='r')
self.valid_tree_as_artifact = Artifact.import_data(
'Phylogeny[Rooted]', valid_tree_fp)

larger_tree_fp = self.get_data_path('crawford.nwk')
self.larger_tree_as_artifact = Artifact.import_data(
'Phylogeny[Rooted]', larger_tree_fp)

def test_function_without_cpu_request_param(self):
with self.assertRaisesRegex(TypeError, 'without.*n_jobs.*threads'):
Expand All @@ -125,7 +141,7 @@ def test_function_with_duplicate_cpu_allocation_params(self):
@mock.patch('psutil.cpu_count', return_value=999)
def test_system_has_no_cpu_affinity(self, mock_cpu_count, mock_process):
mock_process = psutil.Process()
mock_process.cpu_affinity = MagicMock(side_effect=AttributeError)
mock_process.cpu_affinity = mock.MagicMock(side_effect=AttributeError)
self.assertEqual(self.function_w_n_jobs_param(999), 999)
assert mock_process.cpu_affinity.called

Expand Down Expand Up @@ -158,3 +174,38 @@ def test_auto_passed_to_cpu_request(self, mock_process):
self.assertEqual(self.function_w_n_jobs_param(n_jobs='auto'), 3)
self.assertEqual(self.function_w_threads_param('auto'), 3)
self.assertEqual(self.function_w_threads_param(threads='auto'), 3)

@mock.patch("q2_diversity_lib._util.psutil.Process")
ChrisKeefe marked this conversation as resolved.
Show resolved Hide resolved
def test_cpu_request_through_framework(self, mock_process):
mock_process = psutil.Process()
mock_process.cpu_affinity = mock.MagicMock(return_value=[0])

self.jaccard_thru_framework(self.larger_table_as_artifact, n_jobs=1)
self.jaccard_thru_framework(self.larger_table_as_artifact,
n_jobs='auto')
self.unweighted_unifrac_thru_framework(self.larger_table_as_artifact,
self.larger_tree_as_artifact,
threads=1)
self.unweighted_unifrac_thru_framework(self.larger_table_as_artifact,
self.larger_tree_as_artifact,
threads='auto')
# If we get here, then it ran without error
self.assertTrue(True)

@mock.patch("q2_diversity_lib._util.psutil.Process")
def test_more_threads_than_max_stripes(self, mock_process):
mock_process = psutil.Process()
mock_process.cpu_affinity = mock.MagicMock(return_value=[0])

# The two_feature_table used here has only three samples, meaning
# that it has a max of (3+1)/2 = 2 stripes. Unifrac may report
# requests of more-threads-than-stripes to stderror, but should handle
# that situation gracefully.
self.unweighted_unifrac_thru_framework(
self.two_feature_table_as_artifact,
self.valid_tree_as_artifact, threads=1)
self.unweighted_unifrac_thru_framework(
self.two_feature_table_as_artifact,
self.valid_tree_as_artifact, threads='auto')
# If we get here, then it ran without error
self.assertTrue(True)