Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Check size of input files to handle nfs sync issues #62

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sisyphus/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ def file_caching(path):
WAIT_PERIOD_JOB_CLEANUP = 10
#: How many seconds should all inputs be available before starting a job to avoid file system synchronization problems
WAIT_PERIOD_MTIME_OF_INPUTS = 60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WAIT_PERIOD_MTIME_OF_INPUTS can be removed since the check inside of task is removed. Setting it should also be removed in toolkit.setup_script_mode

# How many seconds a task should wait between checking if its inputs have the expected size
WAIT_PERIOD_CHECK_FILE_SIZE = 10
#: How many seconds to wait for inputs to be synced across the network before giving up
MAX_WAIT_FILE_SYNC = 1800

#: set true to automatically clean jobs in error state and retry
CLEAR_ERROR = False
Expand Down
96 changes: 94 additions & 2 deletions sisyphus/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

"""

from ast import literal_eval
import copy
import gzip
import inspect
Expand All @@ -18,7 +19,11 @@
import sys
import time
import traceback
from typing import List, Iterator
import json
import pathlib
from collections import defaultdict
from itertools import chain
from typing import List, Iterator, Tuple, Union, Dict

from sisyphus import block, tools
from sisyphus.task import Task
Expand Down Expand Up @@ -651,9 +656,96 @@ def _sis_all_path_available(self):
return False
return True

def _sis_get_file_stats(self) -> List[Tuple[str, float, int]]:
"""
Returns a triple for every file below `work` and `output`: path, modification time, size.

Path is relative to the job dir, the modification time is epoch time.

These stats are written to the `usage` files by the `LoggingThread`, and read by
Job._sis_get_expected_file_sizes.

"""
stats = []
below_work = pathlib.Path(self._sis_path(gs.WORK_DIR)).rglob("*")
mirkovogel marked this conversation as resolved.
Show resolved Hide resolved
below_output = pathlib.Path(self._sis_path(gs.OUTPUT_DIR)).rglob("*")
for p in chain(below_work, below_output):
if p.is_file():
stat = p.stat()
rel_path = str(p.relative_to(self._sis_path()))
stats.append((rel_path, stat.st_mtime, stat.st_size))

return stats


@staticmethod
def _sis_get_expected_file_sizes(job: Union[str,"Job"], task: str = None,
timeout = gs.MAX_WAIT_FILE_SYNC) -> Dict[str, int]:
"""
Tries to obtain the expected file sizes for files below `output` and `work` from the usage
files of the given job (or job dir). Returns None if the job had already been cleaned up.

If a usage file does not contain the file size information, this is either because the
respective task is still runnung or because the usage file is not yet synced. In this case,
retry until timeout and raise a TimeoutError.

When accumulating the information from several files, the most recent size info is retained
for every *existing* path. That is, deleted files are not part of the returned list.

The file paths returned are relative to the experiment directory.

If `task` is given, only usage files from these tasks are read.

"""
# The job might be a Job object or the job directory
try:
job_dir = job._sis_path()
except AttributeError:
job_dir = job

if os.path.exists(os.path.join(job_dir, gs.JOB_FINISHED_ARCHIVE)):
Copy link
Contributor

@critias critias Jul 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could alternatively also be checked doing something like this:

import tarfile
tf=tarfile(os.path.join(job_dir, gs.JOB_FINISHED_ARCHIVE))
for n in tf:
  if n.path.startswith('usage.'):
    usage = literal_eval(tf.extractfile(n.path).read())

but I'm not sure if this would be worth the effort. The given solution should work in nearly all cases if the clean up timeout is large enough.

logging.info("No expected file size info for job %s, is has already been cleaned up.", job_dir)
return None

m_times = defaultdict(int)
sizes = dict()

exp = "{0}.{1}.*".format(gs.PLOGGING_FILE, task if task else "*")
for fn in pathlib.Path(job_dir).glob(exp):
start = time.time()
while True:
with open(fn) as f:
try:
stats = literal_eval(f.read())["file_stats"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can happen that this raises a SyntaxError if the file is accessed while it's being written.

except KeyError:
# Fairly unlikely to happen: A job from an earlier sisyphus run should be cleaned up.
logging.warning("%s contains no file_stats (was created by an older version of sisyphus).")
stats = []
break

if stats:
break
if time.time() - start > timeout:
logging.error("%s not synced for more than %ds, file_stats still empty.", fn, timeout)
raise TimeoutError
logging.info("%s not synced yet, file_stats still empty.", fn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause problems if a path is available and should be used before the job is finished e.g. training of a neural model.
We could require that these paths have a special attribute set. They could then be excluded from this check.

time.sleep(gs.WAIT_PERIOD_CHECK_FILE_SIZE)

for (rel_path, m_time, size) in stats:
path = os.path.join(job_dir, rel_path)
# Omit deleted files
if not os.path.exists(path):
continue
if m_time > m_times[path]:
m_times[path] = m_time
sizes[path] = size

return sizes


def _sis_runnable(self):
""" True if all inputs are available, also checks if new inputs are requested """

if not self._sis_update_possible():
# Short cut used for most jobs
return self._sis_all_path_available()
Expand Down
90 changes: 77 additions & 13 deletions sisyphus/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import sisyphus.tools as tools
import sisyphus.global_settings as gs

from . import job

class Task(object):
"""
Expand Down Expand Up @@ -77,6 +77,16 @@ def set_job(self, job):
def get_f(self, name):
return getattr(self._job, name)

def get_prev_task(self) -> 'Task':
""" Returns the task peceeding this one or None if it's the first one """
prev = None
for t in self._job.tasks():
if t.name() == self.name():
break
prev = t

return prev

def task_ids(self):
"""
:return: list with all valid task ids
Expand Down Expand Up @@ -112,22 +122,18 @@ def run(self, task_id, resume_job=False, logging_thread=None):
:param sisyphus.worker.LoggingThread logging_thread:
"""


logging.debug("Task name: %s id: %s" % (self.name(), task_id))
job = self._job

logging.info("Start Job: %s Task: %s" % (job, self.name()))
logging.info("Inputs:")
for i in self._job._sis_inputs:
logging.info(str(i))

# each input must be at least X seconds old
# if an input file is too young it's may not synced in a network filesystem yet
try:
input_age = time.time() - os.stat(i.get_path()).st_mtime
time.sleep(max(0, gs.WAIT_PERIOD_MTIME_OF_INPUTS - input_age))
except FileNotFoundError:
logging.warning('Input path does not exist: %s' % i.get_path())
logging.info("Inputs:\n%s", "\n".join( str(i) for i in self._job._sis_inputs))

try:
self._wait_for_input_to_sync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be could to be able to switch off the new check with an entry in the setting file if something goes wrong. Even better: switch back to the old timeout.

except TimeoutError:
self.error(task_id, True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't really stop the task from running. It's sets the error marker and then continues. Once the task is finished a finished marker is set and the error marker is ignored.


for i in self._job._sis_inputs:
if i.creator and gs.ENABLE_LAST_USAGE:
# mark that input was used
try:
Expand Down Expand Up @@ -209,6 +215,64 @@ def run(self, task_id, resume_job=False, logging_thread=None):
sys.stderr.flush()
logging.info("Job finished successful")

def _wait_for_input_to_sync(self):
"""
Waits for the input files of this task to be synced across the network, eventually raising a
TimeoutError.

The input files are either the ouput files of other jobs or the output files of a preceeding
task of this job

"""
# Collect expected file sizes, either from a preceeding task or from other jobs
logging.info("Getting expected input sizes ...")

expected_sizes = {}
prev = self.get_prev_task()

if prev:
expected_sizes = job.Job._sis_get_expected_file_sizes(self._job, task=prev.name())
if expected_sizes is None:
logging.warning("This tasks job has already been cleanup up, shouldn't happen!")
expected_sizes = {}
else:
for i in self._job._sis_inputs:
if not i.creator:
logging.info("Cannot check the size of %s, it's not created by sisyphus.", i)
continue

other_job_sizes = job.Job._sis_get_expected_file_sizes(i.creator)
# If the job has been cleaned up, no size info is available, but we can safely
# assume that enough time has passed so that all files are synced.
if other_job_sizes:
expected_sizes[i.rel_path()] = other_job_sizes[i.rel_path()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails if a path is only used as prefix without representing a real file. We could change it to something like this:

try:
    expected_sizes[rel_path] = other_job_sizes[rel_path]
except KeyError:
    for k, v in other_job_sizes.items():
        if k.startswith(rel_path):
            expected_sizes[k] = v

This could also be used if the path is pointing to a directory.


s = "\n".join("{0}\t{1}".format(*i) for i in expected_sizes.items())
logging.debug("Expected file sizes:\n%s", s)

# Make sure the files have the required size
logging.info("Waiting for the filesystem to sync files ...")
for path, expected_size in expected_sizes.items():

start = time.time()
while True:
try:
cur_size = os.stat(path).st_size
except FileNotFoundError:
cur_size = -1

if cur_size == expected_size:
logging.debug("%s is synced (size: %s)", path, cur_size)
break

if time.time() - start > gs.MAX_WAIT_FILE_SYNC:
logging.error("%s not synced for more than MAX_WAIT_FILE_SYNC.", path)
raise TimeoutError

logging.info("%s not synced yet (current size %d, expected: %d).", path, cur_size, expected_size)
time.sleep(gs.WAIT_PERIOD_CHECK_FILE_SIZE)


def task_name(self):
return '%s.%s' % (self._job._sis_id(), self.name())

Expand Down
13 changes: 10 additions & 3 deletions sisyphus/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def run(self):
except KeyError:
pass

def log_usage(current):
def log_usage(current, file_stats = []):
with open(usage_file_path, 'w') as usage_file:
usage = {'max': max_resources,
'current': current,
Expand All @@ -90,7 +90,8 @@ def log_usage(current):
'host': socket.gethostname(),
'current_time': time.ctime(),
'out_of_memory': self.out_of_memory,
'requested_resources': self.rqmt}
'requested_resources': self.rqmt,
'file_stats': file_stats}
usage_file.write("%s\n" % pprint.pformat(usage))

last_log_value = 0
Expand Down Expand Up @@ -137,7 +138,13 @@ def log_usage(current):
# if max_mem and (max_mem - last_rss) / max_mem < 0.02 or max_mem - last_rss < 2**28:
# self.task.check_state(gs.JOB_CLOSE_TO_MAX_MEM, task_id=self.task_id, update=True)

log_usage(resources)
file_stats = self.job._sis_get_file_stats()
logging.debug("File stats:")
for (path, mtime, size) in file_stats:
logging.debug("%s (size: %s, mtime: %s)", path, size,
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(mtime)))

log_usage(resources, file_stats)
logging.info("Max resources: Run time: {time} CPU: {cpu}% RSS: {rss} VMS: {vms}"
"".format(time=format_time(time.time() - start_time),
cpu=max_resources['cpu'],
Expand Down