Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move CalcJob.presubmit call from CalcJob.run to Waiting.execute #3666

Merged
merged 1 commit into from
Dec 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
the routines make reference to the suitable plugins for all
plugin-specific operations.
"""

import os


from aiida.common import AIIDA_LOGGER, exceptions
from aiida.common.datastructures import CalcJobState
from aiida.common.folders import SandboxFolder
Expand All @@ -31,14 +29,13 @@
execlogger = AIIDA_LOGGER.getChild('execmanager')


def upload_calculation(node, transport, calc_info, script_filename, inputs=None, dry_run=False):
def upload_calculation(node, transport, calc_info, folder, inputs=None, dry_run=False):
"""Upload a `CalcJob` instance

:param node: the `CalcJobNode`.
:param transport: an already opened transport to use to submit the calculation.
:param calc_info: the calculation info datastructure returned by `CalcJobNode.presubmit`
:param script_filename: the job launch script returned by `CalcJobNode.presubmit`
:return: tuple of ``calc_info`` and ``script_filename``
:param calc_info: the calculation info datastructure returned by `CalcJob.presubmit`
:param folder: temporary local file system folder containing the inputs written by `CalcJob.prepare_for_submission`
"""
from logging import LoggerAdapter
from tempfile import NamedTemporaryFile
Expand All @@ -65,7 +62,8 @@ def upload_calculation(node, transport, calc_info, script_filename, inputs=None,
raise ValueError('Cannot submit calculation {} because it has cached input links! If you just want to test the '
'submission, set `metadata.dry_run` to True in the inputs.'.format(node.pk))

folder = node._raw_input_folder
# After this call, no modifications to the folder should be done
node.put_object_from_tree(folder.abspath, force=True)

# If we are performing a dry-run, the working directory should actually be a local folder that should already exist
if dry_run:
Expand Down Expand Up @@ -251,8 +249,6 @@ def find_data_node(inputs, uuid):
remotedata.add_incoming(node, link_type=LinkType.CREATE, link_label='remote_folder')
remotedata.store()

return calc_info, script_filename


def submit_calculation(calculation, transport, calc_info, script_filename):
"""Submit a previously uploaded `CalcJob` to the scheduler.
Expand Down
117 changes: 50 additions & 67 deletions aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,55 +214,31 @@ def run(self):
This means invoking the `presubmit` and storing the temporary folder in the node's repository. Then we move the
process in the `Wait` state, waiting for the `UPLOAD` transport task to be started.
"""
from aiida.orm import Code, load_node
from aiida.common.folders import SandboxFolder, SubmitTestFolder
from aiida.common.exceptions import InputValidationError

# The following conditional is required for the caching to properly work. Even if the source node has a process
# state of `Finished` the cached process will still enter the running state. The process state will have then
# been overridden by the engine to `Running` so we cannot check that, but if the `exit_status` is anything other
# than `None`, it should mean this node was taken from the cache, so the process should not be rerun.
if self.node.exit_status is not None:
return self.node.exit_status

if self.inputs.metadata.dry_run:
folder_class = SubmitTestFolder
else:
folder_class = SandboxFolder

with folder_class() as folder:
computer = self.node.computer

if not self.inputs.metadata.dry_run and self.node.has_cached_links():
raise exceptions.InvalidOperation('calculation node has unstored links in cache')

calc_info, script_filename = self.presubmit(folder)
calc_info.uuid = str(self.uuid)
input_codes = [load_node(_.code_uuid, sub_classes=(Code,)) for _ in calc_info.codes_info]

for code in input_codes:
if not code.can_run_on(computer):
raise InputValidationError(
'The selected code {} for calculation {} cannot run on computer {}'.format(
code.pk, self.node.pk, computer.name))

# After this call, no modifications to the folder should be done
self.node.put_object_from_tree(folder.abspath, force=True)
from aiida.common.folders import SubmitTestFolder
from aiida.engine.daemon.execmanager import upload_calculation
from aiida.transports.plugins.local import LocalTransport

if self.inputs.metadata.dry_run:
from aiida.engine.daemon.execmanager import upload_calculation
from aiida.transports.plugins.local import LocalTransport
with LocalTransport() as transport:
with LocalTransport() as transport:
with SubmitTestFolder() as folder:
calc_info, script_filename = self.presubmit(folder)
transport.chdir(folder.abspath)
upload_calculation(self.node, transport, calc_info, script_filename, self.inputs, dry_run=True)
upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True)
self.node.dry_run_info = {
'folder': folder.abspath,
'script_filename': script_filename
}
return plumpy.Stop(None, True)
return plumpy.Stop(None, True)

# The following conditional is required for the caching to properly work. Even if the source node has a process
# state of `Finished` the cached process will still enter the running state. The process state will have then
# been overridden by the engine to `Running` so we cannot check that, but if the `exit_status` is anything other
# than `None`, it should mean this node was taken from the cache, so the process should not be rerun.
if self.node.exit_status is not None:
return self.node.exit_status

# Launch the upload operation
return plumpy.Wait(msg='Waiting to upload', data=(UPLOAD_COMMAND, calc_info, script_filename))
return plumpy.Wait(msg='Waiting to upload', data=UPLOAD_COMMAND)

def prepare_for_submission(self, folder):
"""Prepare files for submission of calculation."""
Expand Down Expand Up @@ -304,7 +280,7 @@ def presubmit(self, folder):
# pylint: disable=too-many-locals,too-many-statements,too-many-branches
import os

from aiida.common.exceptions import PluginInternalError, ValidationError
from aiida.common.exceptions import PluginInternalError, ValidationError, InvalidOperation, InputValidationError
from aiida.common import json
from aiida.common.utils import validate_list_of_string_tuples
from aiida.common.datastructures import CodeInfo, CodeRunMode
Expand All @@ -315,16 +291,23 @@ def presubmit(self, folder):
computer = self.node.computer
inputs = self.node.get_incoming(link_type=LinkType.INPUT_CALC)

codes = [_ for _ in inputs.all_nodes() if isinstance(_, Code)]
if not self.inputs.metadata.dry_run and self.node.has_cached_links():
raise InvalidOperation('calculation node has unstored links in cache')

calcinfo = self.prepare_for_submission(folder)
scheduler = computer.get_scheduler()
codes = [_ for _ in inputs.all_nodes() if isinstance(_, Code)]

for code in codes:
if code.is_local():
if code.get_local_executable() in folder.get_content_list():
raise PluginInternalError('The plugin created a file {} that is also '
'the executable name!'.format(code.get_local_executable()))
if not code.can_run_on(computer):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the equivalent of the previous is_local/get_local_executable/get_content_list check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a different check, this one was just done in the run method while the one you reference was being done in presubmit. I just moved the former here as it makes more sense here. I realize now I can merge the two loops. I will do that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muhrin I merged the two checks in the same loop, so this should be good to go

raise InputValidationError('The selected code {} for calculation {} cannot run on computer {}'.format(
code.pk, self.node.pk, computer.name))

if code.is_local() and code.get_local_executable() in folder.get_content_list():
raise PluginInternalError('The plugin created a file {} that is also the executable name!'.format(
code.get_local_executable()))

calc_info = self.prepare_for_submission(folder)
calc_info.uuid = str(self.node.uuid)
scheduler = computer.get_scheduler()

# I create the job template to pass to the scheduler
job_tmpl = JobTemplate()
Expand All @@ -342,16 +325,16 @@ def presubmit(self, folder):
job_tmpl.sched_join_files = False

# Set retrieve path, add also scheduler STDOUT and STDERR
retrieve_list = (calcinfo.retrieve_list if calcinfo.retrieve_list is not None else [])
retrieve_list = (calc_info.retrieve_list if calc_info.retrieve_list is not None else [])
if (job_tmpl.sched_output_path is not None and job_tmpl.sched_output_path not in retrieve_list):
retrieve_list.append(job_tmpl.sched_output_path)
if not job_tmpl.sched_join_files:
if (job_tmpl.sched_error_path is not None and job_tmpl.sched_error_path not in retrieve_list):
retrieve_list.append(job_tmpl.sched_error_path)
self.node.set_retrieve_list(retrieve_list)

retrieve_singlefile_list = (calcinfo.retrieve_singlefile_list
if calcinfo.retrieve_singlefile_list is not None else [])
retrieve_singlefile_list = (calc_info.retrieve_singlefile_list
if calc_info.retrieve_singlefile_list is not None else [])
# a validation on the subclasses of retrieve_singlefile_list
for _, subclassname, _ in retrieve_singlefile_list:
file_sub_class = DataFactory(subclassname)
Expand All @@ -363,8 +346,8 @@ def presubmit(self, folder):
self.node.set_retrieve_singlefile_list(retrieve_singlefile_list)

# Handle the retrieve_temporary_list
retrieve_temporary_list = (calcinfo.retrieve_temporary_list
if calcinfo.retrieve_temporary_list is not None else [])
retrieve_temporary_list = (calc_info.retrieve_temporary_list
if calc_info.retrieve_temporary_list is not None else [])
self.node.set_retrieve_temporary_list(retrieve_temporary_list)

# the if is done so that if the method returns None, this is
Expand All @@ -375,10 +358,10 @@ def presubmit(self, folder):
# an exception
prepend_texts = [computer.get_prepend_text()] + \
[code.get_prepend_text() for code in codes] + \
[calcinfo.prepend_text, self.node.get_option('prepend_text')]
[calc_info.prepend_text, self.node.get_option('prepend_text')]
job_tmpl.prepend_text = '\n\n'.join(prepend_text for prepend_text in prepend_texts if prepend_text)

append_texts = [self.node.get_option('append_text'), calcinfo.append_text] + \
append_texts = [self.node.get_option('append_text'), calc_info.append_text] + \
[code.get_append_text() for code in codes] + \
[computer.get_append_text()]
job_tmpl.append_text = '\n\n'.join(append_text for append_text in append_texts if append_text)
Expand All @@ -398,11 +381,11 @@ def presubmit(self, folder):
extra_mpirun_params = self.node.get_option('mpirun_extra_params') # same for all codes in the same calc

# set the codes_info
if not isinstance(calcinfo.codes_info, (list, tuple)):
if not isinstance(calc_info.codes_info, (list, tuple)):
raise PluginInternalError('codes_info passed to CalcInfo must be a list of CalcInfo objects')

codes_info = []
for code_info in calcinfo.codes_info:
for code_info in calc_info.codes_info:

if not isinstance(code_info, CodeInfo):
raise PluginInternalError('Invalid codes_info, must be a list of CodeInfo objects')
Expand All @@ -415,7 +398,7 @@ def presubmit(self, folder):

this_withmpi = code_info.withmpi # to decide better how to set the default
if this_withmpi is None:
if len(calcinfo.codes_info) > 1:
if len(calc_info.codes_info) > 1:
raise PluginInternalError('For more than one code, it is '
'necessary to set withmpi in '
'codes_info')
Expand All @@ -439,7 +422,7 @@ def presubmit(self, folder):

if len(codes) > 1:
try:
job_tmpl.codes_run_mode = calcinfo.codes_run_mode
job_tmpl.codes_run_mode = calc_info.codes_run_mode
except KeyError:
raise PluginInternalError('Need to set the order of the code execution (parallel or serial?)')
else:
Expand Down Expand Up @@ -482,24 +465,24 @@ def presubmit(self, folder):

subfolder = folder.get_subfolder('.aiida', create=True)
subfolder.create_file_from_filelike(io.StringIO(json.dumps(job_tmpl)), 'job_tmpl.json', 'w', encoding='utf8')
subfolder.create_file_from_filelike(io.StringIO(json.dumps(calcinfo)), 'calcinfo.json', 'w', encoding='utf8')
subfolder.create_file_from_filelike(io.StringIO(json.dumps(calc_info)), 'calcinfo.json', 'w', encoding='utf8')

if calcinfo.local_copy_list is None:
calcinfo.local_copy_list = []
if calc_info.local_copy_list is None:
calc_info.local_copy_list = []

if calcinfo.remote_copy_list is None:
calcinfo.remote_copy_list = []
if calc_info.remote_copy_list is None:
calc_info.remote_copy_list = []

# Some validation
this_pk = self.node.pk if self.node.pk is not None else '[UNSTORED]'
local_copy_list = calcinfo.local_copy_list
local_copy_list = calc_info.local_copy_list
try:
validate_list_of_string_tuples(local_copy_list, tuple_length=3)
except ValidationError as exc:
raise PluginInternalError('[presubmission of calc {}] '
'local_copy_list format problem: {}'.format(this_pk, exc))

remote_copy_list = calcinfo.remote_copy_list
remote_copy_list = calc_info.remote_copy_list
try:
validate_list_of_string_tuples(remote_copy_list, tuple_length=3)
except ValidationError as exc:
Expand All @@ -519,4 +502,4 @@ def presubmit(self, folder):
'The destination path of the remote copy '
'is absolute! ({})'.format(this_pk, dest_rel_path))

return calcinfo, script_filename
return calc_info, script_filename
16 changes: 10 additions & 6 deletions aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from aiida.common.datastructures import CalcJobState
from aiida.common.exceptions import FeatureNotAvailable, TransportTaskException
from aiida.common.folders import SandboxFolder
from aiida.engine.daemon import execmanager
from aiida.engine.utils import exponential_backoff_retry, interruptable_task
from aiida.schedulers.datastructures import JobState
Expand All @@ -36,7 +37,7 @@


@coroutine
def task_upload_job(node, transport_queue, calc_info, script_filename, cancellable):
def task_upload_job(process, transport_queue, cancellable):
"""Transport task that will attempt to upload the files of a job calculation to the remote.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
Expand All @@ -46,13 +47,13 @@ def task_upload_job(node, transport_queue, calc_info, script_filename, cancellab

:param node: the node that represents the job calculation
:param transport_queue: the TransportQueue from which to request a Transport
:param calc_info: the calculation info datastructure returned by `CalcJobNode._presubmit`
:param script_filename: the job launch script returned by `CalcJobNode._presubmit`
:param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
node = process.node

if node.get_state() == CalcJobState.SUBMITTING:
logger.warning('CalcJob<{}> already marked as SUBMITTING, skipping task_update_job'.format(node.pk))
raise Return(True)
Expand All @@ -66,7 +67,10 @@ def task_upload_job(node, transport_queue, calc_info, script_filename, cancellab
def do_upload():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)
raise Return(execmanager.upload_calculation(node, transport, calc_info, script_filename))
with SandboxFolder() as folder:
calc_info, script_filename = process.presubmit(folder)
execmanager.upload_calculation(node, transport, calc_info, folder)
raise Return((calc_info, script_filename))

try:
logger.info('scheduled request to upload CalcJob<{}>'.format(node.pk))
Expand Down Expand Up @@ -330,7 +334,7 @@ def execute(self):

if command == UPLOAD_COMMAND:
node.set_process_status(process_status)
calc_info, script_filename = yield self._launch_task(task_upload_job, node, transport_queue, *args)
calc_info, script_filename = yield self._launch_task(task_upload_job, self.process, transport_queue)
raise Return(self.submit(calc_info, script_filename))

elif command == SUBMIT_COMMAND:
Expand Down Expand Up @@ -390,7 +394,7 @@ def _launch_task(self, coro, *args, **kwargs):
def upload(self, calc_info, script_filename):
"""Return the `Waiting` state that will `upload` the `CalcJob`."""
msg = 'Waiting for calculation folder upload'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=(UPLOAD_COMMAND, calc_info, script_filename))
return self.create_state(ProcessState.WAITING, None, msg=msg, data=UPLOAD_COMMAND)

def submit(self, calc_info, script_filename):
"""Return the `Waiting` state that will `submit` the `CalcJob`."""
Expand Down