Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1913 Added minimum scheduler poll interval #1929

Merged
merged 2 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@
aiida/orm/calculation/job/simpleplugins/templatereplacer.py|
aiida/orm/calculation/work.py|
aiida/orm/code.py|
aiida/orm/computer.py|
aiida/orm/data/array/bands.py|
aiida/orm/data/array/__init__.py|
aiida/orm/data/array/kpoints.py|
Expand Down Expand Up @@ -458,7 +457,11 @@
sha: a234ce4e185cf77a55632888f1811d83b4ad9ef2
hooks:
- id: python-modernize
exclude: ^docs/
exclude: >
(?x)^(
docs/.*|
aiida/work/utils.py # exclude because tornado WaitIterator.next() does not work with next(...)
)$
args:
- --write
- --nobackups
Expand Down
64 changes: 64 additions & 0 deletions aiida/backends/tests/work/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import unittest
from tornado.ioloop import IOLoop
from tornado.gen import coroutine

from aiida.backends.testbase import AiidaTestCase
import aiida.work as work
from aiida.work.utils import exponential_backoff_retry

ITERATION = 0
Expand Down Expand Up @@ -68,3 +70,65 @@ def coro():
except Exception as e:
print(e)
raise


class RefObjectsStore(unittest.TestCase):
def test_simple(self):
""" Test the reference counting works """
IDENTIFIER = 'a'
OBJECT = 'my string'
obj_store = work.utils.RefObjectStore()

with obj_store.get(IDENTIFIER, lambda: OBJECT) as obj:
# Make sure we got back the same object
self.assertIs(OBJECT, obj)

# Now check that the reference has the correct information
ref = obj_store._objects['a']
self.assertEqual(OBJECT, ref._obj)
self.assertEqual(1, ref.count)

# Now request the object again
with obj_store.get(IDENTIFIER) as obj2:
# ...and check the reference has had it's count upped
self.assertEqual(OBJECT, obj2)
self.assertEqual(2, ref.count)

# Now it should have been reduced
self.assertEqual(1, ref.count)

# Finally the store should be empty (there are no more references)
self.assertEqual(0, len(obj_store._objects))

def test_get_no_constructor(self):
"""
Test that trying to get an object that does exists and providing
no means to construct it fails
"""
obj_store = work.utils.RefObjectStore()
with self.assertRaises(ValueError):
with obj_store.get('a'):
pass

def test_construct(self):
""" Test that construction only gets called when used """
IDENTIFIER = 'a'
OBJECT = 'my string'

# Use a list for a single number so we can get references to it
times_constructed = [0, ]

def construct():
times_constructed[0] += 1
return OBJECT

obj_store = work.utils.RefObjectStore()
with obj_store.get(IDENTIFIER, construct):
self.assertEqual(1, times_constructed[0])
with obj_store.get(IDENTIFIER, construct):
self.assertEqual(1, times_constructed[0])

# Now the object should be removed and so another call to get
# should create
with obj_store.get(IDENTIFIER, construct):
self.assertEqual(2, times_constructed[0])
91 changes: 6 additions & 85 deletions aiida/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,49 +238,6 @@ def submit_calculation(calculation, transport, calc_info, script_filename):
calculation._set_job_id(job_id)


def update_calculation(calculation, transport):
"""
Update the scheduler state of a calculation

:param calculation: the instance of JobCalculation to update.
:param transport: an already opened transport to use to query the scheduler
"""
scheduler = calculation.get_computer().get_scheduler()
scheduler.set_transport(transport)

job_id = calculation.get_job_id()

kwargs = {'as_dict': True}

if scheduler.get_feature('can_query_by_user'):
kwargs['user'] = "$USER"
else:
# In general schedulers can either query by user or by jobs, but not both
# (see also docs of the Scheduler class)
kwargs['jobs'] = [job_id]

found_jobs = scheduler.getJobs(**kwargs)
job_info = found_jobs.get(job_id, None)

if job_info is None:
# If the job is computed or not found assume it's done
job_done = True
calculation._set_scheduler_state(JOB_STATES.DONE)
else:
job_done = job_info.job_state == JOB_STATES.DONE
update_job_calc_from_job_info(calculation, job_info)

if job_done:
try:
detailed_job_info = scheduler.get_detailed_jobinfo(job_id)
except exceptions.FeatureNotAvailable:
detailed_job_info = ('This scheduler does not implement get_detailed_jobinfo')

update_job_calc_from_detailed_job_info(calculation, detailed_job_info)

return job_done


def retrieve_calculation(calculation, transport, retrieved_temporary_folder):
"""
Retrieve all the files of a completed job calculation using the given transport.
Expand Down Expand Up @@ -374,43 +331,6 @@ def kill_calculation(calculation, transport):
return True


def update_job_calc_from_job_info(calc, job_info):
"""
Updates the job info for a JobCalculation using job information
as obtained from the scheduler.

:param calc: The job calculation
:param job_info: the information returned by the scheduler for this job
:return: True if the job state is DONE, False otherwise
:rtype: bool
"""
calc._set_scheduler_state(job_info.job_state)
calc._set_last_jobinfo(job_info)

return job_info.job_state in JOB_STATES.DONE


def update_job_calc_from_detailed_job_info(calc, detailed_job_info):
"""
Updates the detailed job info for a JobCalculation as obtained from
the scheduler

:param calc: The job calculation
:param detailed_job_info: the detailed information as returned by the
scheduler for this job
"""
from aiida.scheduler.datastructures import JobInfo

last_jobinfo = calc._get_last_jobinfo()
if last_jobinfo is None:
last_jobinfo = JobInfo()
last_jobinfo.job_id = calc.get_job_id()
last_jobinfo.job_state = JOB_STATES.DONE

last_jobinfo.detailedJobinfo = detailed_job_info
calc._set_last_jobinfo(last_jobinfo)


def parse_results(job, retrieved_temporary_folder=None):
"""
Parse the results for a given JobCalculation (job)
Expand All @@ -436,11 +356,11 @@ def parse_results(job, retrieved_temporary_folder=None):
files.append("- [F] {}".format(os.path.join(root, filename)))

execlogger.debug("[parsing of calc {}] "
"Content of the retrieved_temporary_folder: \n"
"{}".format(job.pk, "\n".join(files)), extra=logger_extra)
"Content of the retrieved_temporary_folder: \n"
"{}".format(job.pk, "\n".join(files)), extra=logger_extra)
else:
execlogger.debug("[parsing of calc {}] "
"No retrieved_temporary_folder.".format(job.pk), extra=logger_extra)
"No retrieved_temporary_folder.".format(job.pk), extra=logger_extra)

if Parser is not None:

Expand All @@ -461,7 +381,7 @@ def parse_results(job, retrieved_temporary_folder=None):
pass
else:
raise ValueError("parse_from_calc returned an 'exit_code' of invalid_type: {}. It should "
"return a boolean, integer or ExitCode instance".format(type(exit_code)))
"return a boolean, integer or ExitCode instance".format(type(exit_code)))

for label, n in new_nodes_tuple:
n.add_link_from(job, label=label, link_type=LinkType.CREATE)
Expand Down Expand Up @@ -571,5 +491,6 @@ def retrieve_files_from_list(calculation, transport, folder, retrieve_list):
local_names = [os.path.split(item)[1]]

for rem, loc in zip(remote_names, local_names):
transport.logger.debug("[retrieval of calc {}] Trying to retrieve remote item '{}'".format(calculation.pk, rem))
transport.logger.debug(
"[retrieval of calc {}] Trying to retrieve remote item '{}'".format(calculation.pk, rem))
transport.get(rem, os.path.join(folder, loc), ignore_nonexisting=True)
44 changes: 34 additions & 10 deletions aiida/orm/authinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ def get(self, computer, user):
@six.add_metaclass(abc.ABCMeta)
class AuthInfo(CollectionEntry):
"""
Base class to map a DbAuthInfo, that contains computer configuration
specific to a given user (authorization info and other metadata, like
how often to check on a given computer etc.)
Store the settings of a particular user on a given computer.
(e.g. authorization info and other metadata, like how often to check on a
given computer etc.)
"""

METADATA_WORKDIR = 'workdir'

def pk(self):
"""
Return the principal key in the DB.
Expand All @@ -84,7 +86,7 @@ def enabled(self):
"""
Is the computer enabled for this user?

:return: Boolean
:rtype: bool
"""
pass

Expand All @@ -99,10 +101,22 @@ def enabled(self, value):

@abc.abstractproperty
def computer(self):
"""
The computer that this authinfo relates to

:return: The corresponding computer
:rtype: :class:`aiida.orm.Computer`
"""
pass

@abc.abstractproperty
def user(self):
"""
The user that this authinfo relates to

:return: The corresponding user
:rtype: :class:`aiida.orm.User`
"""
pass

@abc.abstractproperty
Expand All @@ -111,6 +125,7 @@ def is_stored(self):
Is it already stored or not?

:return: Boolean
:rtype: bool
"""
pass

Expand All @@ -133,7 +148,7 @@ def set_auth_params(self, auth_params):
pass

@abc.abstractmethod
def get_metadata(self):
def _get_metadata(self):
"""
Get the metadata dictionary

Expand All @@ -142,23 +157,32 @@ def get_metadata(self):
pass

@abc.abstractmethod
def set_metadata(self, metadata):
def _set_metadata(self, metadata):
"""
Replace the metadata dictionary in the DB with the provided dictionary
"""
pass

def _get_property(self, name):
try:
return self._get_metadata()[name]
except KeyError:
raise ValueError('Unknown property: {}'.format(name))

def _set_property(self, name, value):
metadata = self._get_metadata()
metadata[name] = value
self._set_metadata(metadata)

def get_workdir(self):
"""
Get the workdir; defaults to the value of the corresponding computer, if not explicitly set

:return: a string
"""
metadata = self.get_metadata()

try:
return metadata['workdir']
except KeyError:
return self._get_property(self.METADATA_WORKDIR)
except ValueError:
return self.computer.get_workdir()

def __str__(self):
Expand Down
Loading