diff --git a/conftest.py b/conftest.py index 8495ef5bf..3ef905f6d 100644 --- a/conftest.py +++ b/conftest.py @@ -1,11 +1,15 @@ import operator import os -import pytest import shutil import sys import tempfile import time +import contextlib +import string + +import util.file +import pytest def timer(): if sys.version_info < (3, 3): @@ -24,51 +28,36 @@ def pytest_addoption(parser): help="show N slowest fixture durations (N=0 for all)." ), - def pytest_configure(config): reporter = FixtureReporter(config) config.pluginmanager.register(reporter, 'fixturereporter') +@contextlib.contextmanager +def _tmpdir_aux(request, tmpdir_factory, scope, name): + """Create and return a temporary directory; remove it and its contents on context exit.""" + with util.file.tmp_dir(dir=str(tmpdir_factory.getbasetemp()), + prefix='test-{}-{}-'.format(scope, name)) as tmpdir: + yield tmpdir @pytest.fixture(scope='session') def tmpdir_session(request, tmpdir_factory): - tmpdir = str(tmpdir_factory.mktemp('test-session')) - - def reset(): - shutil.rmtree(tmpdir) - - request.addfinalizer(reset) - return tmpdir - + """Create a session-scope temporary directory.""" + with _tmpdir_aux(request, tmpdir_factory, 'session', id(request.session)) as tmpdir: + yield tmpdir @pytest.fixture(scope='module') def tmpdir_module(request, tmpdir_factory): - tmpdir = str(tmpdir_factory.mktemp('test-module')) - - def reset(): - shutil.rmtree(tmpdir) - - request.addfinalizer(reset) - return tmpdir - + """Create a module-scope temporary directory.""" + with _tmpdir_aux(request, tmpdir_factory, 'module', request.module.__name__) as tmpdir: + yield tmpdir @pytest.fixture(autouse=True) -def tmpdir_function(request, tmpdir_factory): - old_tempdir = tempfile.tempdir - old_env_tmpdir = os.environ.get('TMPDIR') - new_tempdir = str(tmpdir_factory.mktemp('test-function')) - tempfile.tempdir = new_tempdir - os.environ['TMPDIR'] = new_tempdir - - def reset(): - shutil.rmtree(new_tempdir) - tempfile.tmpdir = old_tempdir - if old_env_tmpdir: - os.environ['TMPDIR'] = old_env_tmpdir - - request.addfinalizer(reset) - return new_tempdir - +def tmpdir_function(request, tmpdir_factory, monkeypatch): + """Create a temporary directory and set it to be used by the tempfile module and as the TMPDIR environment variable.""" + with _tmpdir_aux(request, tmpdir_factory, 'node', request.node.name) as tmpdir: + monkeypatch.setattr(tempfile, 'tempdir', tmpdir) + monkeypatch.setenv('TMPDIR', tmpdir) + yield tmpdir class FixtureReporter: diff --git a/test/unit/test_util_file.py b/test/unit/test_util_file.py index b83b5f03d..0c065fd1e 100644 --- a/test/unit/test_util_file.py +++ b/test/unit/test_util_file.py @@ -3,8 +3,9 @@ __author__ = "ilya@broadinstitute.org" import os, os.path -import pytest +import builtins import util.file +import pytest def testTempFiles(): '''Test creation of tempfiles using context managers, as well as dump_file/slurp_file routines''' @@ -70,6 +71,21 @@ def test_f(f): util.file.make_empty(join(writable_dir, 'myempty.dat')) check_paths(read_and_write=join(writable_dir, 'myempty.dat')) - +def test_string_to_file_name(): + """Test util.file.string_to_file_name()""" + + unichr = getattr(builtins, 'unichr', chr) + test_fnames = ( + 'simple', 'simple.dat', 'a/b', '/a', '/a/b', '/a/b/', 'a\\b', + 'a^b&c|d" e::f!g;*?`test`', '(somecmd -f --flag < input > output) && ls', + 'long' * 8000, 'oddchars\\xAA\\o037', + ''.join(map(chr, range(128))) * 20, + ''.join(map(unichr, range(1000))), + ) + with util.file.tmp_dir() as tmp_d: + for test_fname in test_fnames: + t_path = os.path.join(tmp_d, util.file.string_to_file_name(test_fname, tmp_d)) + util.file.make_empty(t_path) + assert os.path.isfile(t_path) and os.path.getsize(t_path) == 0 diff --git a/util/file.py b/util/file.py index 715ae0b5e..ddc2456e0 100644 --- a/util/file.py +++ b/util/file.py @@ -15,11 +15,13 @@ import errno import logging import json -import util.cmd -import util.misc import sys import io import csv +import inspect + +import util.cmd +import util.misc from Bio import SeqIO from Bio.Seq import Seq @@ -151,14 +153,24 @@ def tempfnames(suffixes, *args, **kwargs): def tmp_dir(*args, **kwargs): """Create and return a temporary directory, which is cleaned up on context exit unless keep_tmp() is True.""" + + _args = inspect.getcallargs(tempfile.mkdtemp, *args, **kwargs) + length_margin = 6 + for pfx_sfx in ('prefix', 'suffix'): + if _args[pfx_sfx]: + _args[pfx_sfx] = string_to_file_name(_args[pfx_sfx], file_system_path=_args['dir'], length_margin=length_margin) + length_margin += len(_args[pfx_sfx].encode('utf-8')) + + name = None try: - name = tempfile.mkdtemp(*args, **kwargs) + name = tempfile.mkdtemp(**_args) yield name finally: - if keep_tmp(): - log.debug('keeping tempdir ' + name) - else: - shutil.rmtree(name) + if name is not None: + if keep_tmp(): + log.debug('keeping tempdir ' + name) + else: + shutil.rmtree(name, ignore_errors=True) @contextlib.contextmanager def pushd_popd(target_dir): @@ -567,11 +579,34 @@ def temp_catted_files(input_files, suffix=None, prefix=None): finally: os.remove(fn) +def _get_pathconf(file_system_path, param_suffix, default): + """Return a pathconf parameter value for a filesystem. + """ + param_str = [s for s in os.pathconf_names if s.endswith(param_suffix)] + if len(param_str) == 1: + try: + return os.pathconf(file_system_path, param_str[0]) + except OSError: + pass + return default + +def max_file_name_length(file_system_path): + """Return the maximum valid length of a filename (path component) on the given filesystem.""" + return _get_pathconf(file_system_path, '_NAME_MAX', 80)-1 -def string_to_file_name(string_value): +def max_path_length(file_system_path): + """Return the maximum valid length of a path on the given filesystem.""" + return _get_pathconf(file_system_path, '_PATH_MAX', 255)-1 + +def string_to_file_name(string_value, file_system_path=None, length_margin=0): + """Constructs a valid file name from a given string, replacing or deleting invalid characters. + If `file_system_path` is given, makes sure the file name is valid on that file system. + If `length_margin` is given, ensure a string that long can be added to filename without breaking length limits. + """ replacements_dict = { "\\": "-", # win directory separator "/": "-", # posix directory separator + os.sep: "-", # directory separator "^": "_", # caret "&": "_and_", # background "\"": "", # double quotes @@ -616,6 +651,16 @@ def string_to_file_name(string_value): # remove leading or trailing periods (no hidden files (*NIX) or missing file extensions (NTFS)) string_value = string_value.strip(".") + # comply with file name length limits + if file_system_path is not None: + max_len = max(1, max_file_name_length(file_system_path) - length_margin) + string_value = string_value[:max_len] + while len(string_value.encode('utf-8')) > max_len: + string_value = string_value[:-1] + + # ensure all the character removals did not make the name empty + string_value = string_value or '_' + return string_value def grep_count(file_path, to_match, additional_flags=None, fixed_mode=True, starts_with=False):