Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conftest.py fixes #840

Merged
merged 5 commits into from
Jul 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 23 additions & 34 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import operator
import os
import pytest
import shutil
import sys
import tempfile
import time
import contextlib
import string

import util.file

import pytest

def timer():
if sys.version_info < (3, 3):
Expand All @@ -24,51 +28,36 @@ def pytest_addoption(parser):
help="show N slowest fixture durations (N=0 for all)."
),


def pytest_configure(config):
reporter = FixtureReporter(config)
config.pluginmanager.register(reporter, 'fixturereporter')

@contextlib.contextmanager
def _tmpdir_aux(request, tmpdir_factory, scope, name):
"""Create and return a temporary directory; remove it and its contents on context exit."""
with util.file.tmp_dir(dir=str(tmpdir_factory.getbasetemp()),
prefix='test-{}-{}-'.format(scope, name)) as tmpdir:
yield tmpdir

@pytest.fixture(scope='session')
def tmpdir_session(request, tmpdir_factory):
tmpdir = str(tmpdir_factory.mktemp('test-session'))

def reset():
shutil.rmtree(tmpdir)

request.addfinalizer(reset)
return tmpdir

"""Create a session-scope temporary directory."""
with _tmpdir_aux(request, tmpdir_factory, 'session', id(request.session)) as tmpdir:
yield tmpdir

@pytest.fixture(scope='module')
def tmpdir_module(request, tmpdir_factory):
tmpdir = str(tmpdir_factory.mktemp('test-module'))

def reset():
shutil.rmtree(tmpdir)

request.addfinalizer(reset)
return tmpdir

"""Create a module-scope temporary directory."""
with _tmpdir_aux(request, tmpdir_factory, 'module', request.module.__name__) as tmpdir:
yield tmpdir

@pytest.fixture(autouse=True)
def tmpdir_function(request, tmpdir_factory):
old_tempdir = tempfile.tempdir
old_env_tmpdir = os.environ.get('TMPDIR')
new_tempdir = str(tmpdir_factory.mktemp('test-function'))
tempfile.tempdir = new_tempdir
os.environ['TMPDIR'] = new_tempdir

def reset():
shutil.rmtree(new_tempdir)
tempfile.tmpdir = old_tempdir
if old_env_tmpdir:
os.environ['TMPDIR'] = old_env_tmpdir

request.addfinalizer(reset)
return new_tempdir

def tmpdir_function(request, tmpdir_factory, monkeypatch):
"""Create a temporary directory and set it to be used by the tempfile module and as the TMPDIR environment variable."""
with _tmpdir_aux(request, tmpdir_factory, 'node', request.node.name) as tmpdir:
monkeypatch.setattr(tempfile, 'tempdir', tmpdir)
monkeypatch.setenv('TMPDIR', tmpdir)
yield tmpdir

class FixtureReporter:

Expand Down
20 changes: 18 additions & 2 deletions test/unit/test_util_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
__author__ = "ilya@broadinstitute.org"

import os, os.path
import pytest
import builtins
import util.file
import pytest

def testTempFiles():
'''Test creation of tempfiles using context managers, as well as dump_file/slurp_file routines'''
Expand Down Expand Up @@ -70,6 +71,21 @@ def test_f(f):
util.file.make_empty(join(writable_dir, 'myempty.dat'))
check_paths(read_and_write=join(writable_dir, 'myempty.dat'))


def test_string_to_file_name():
"""Test util.file.string_to_file_name()"""

unichr = getattr(builtins, 'unichr', chr)

test_fnames = (
'simple', 'simple.dat', 'a/b', '/a', '/a/b', '/a/b/', 'a\\b',
'a^b&c|d" e::f!g;*?`test`', '(somecmd -f --flag < input > output) && ls',
'long' * 8000, 'oddchars\\xAA\\o037',
''.join(map(chr, range(128))) * 20,
''.join(map(unichr, range(1000))),
)

with util.file.tmp_dir() as tmp_d:
for test_fname in test_fnames:
t_path = os.path.join(tmp_d, util.file.string_to_file_name(test_fname, tmp_d))
util.file.make_empty(t_path)
assert os.path.isfile(t_path) and os.path.getsize(t_path) == 0
61 changes: 53 additions & 8 deletions util/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import errno
import logging
import json
import util.cmd
import util.misc
import sys
import io
import csv
import inspect

import util.cmd
import util.misc

from Bio import SeqIO
from Bio.Seq import Seq
Expand Down Expand Up @@ -151,14 +153,24 @@ def tempfnames(suffixes, *args, **kwargs):
def tmp_dir(*args, **kwargs):
"""Create and return a temporary directory, which is cleaned up on context exit
unless keep_tmp() is True."""

_args = inspect.getcallargs(tempfile.mkdtemp, *args, **kwargs)
length_margin = 6
for pfx_sfx in ('prefix', 'suffix'):
if _args[pfx_sfx]:
_args[pfx_sfx] = string_to_file_name(_args[pfx_sfx], file_system_path=_args['dir'], length_margin=length_margin)
length_margin += len(_args[pfx_sfx].encode('utf-8'))

name = None
try:
name = tempfile.mkdtemp(*args, **kwargs)
name = tempfile.mkdtemp(**_args)
yield name
finally:
if keep_tmp():
log.debug('keeping tempdir ' + name)
else:
shutil.rmtree(name)
if name is not None:
if keep_tmp():
log.debug('keeping tempdir ' + name)
else:
shutil.rmtree(name, ignore_errors=True)

@contextlib.contextmanager
def pushd_popd(target_dir):
Expand Down Expand Up @@ -567,11 +579,34 @@ def temp_catted_files(input_files, suffix=None, prefix=None):
finally:
os.remove(fn)

def _get_pathconf(file_system_path, param_suffix, default):
"""Return a pathconf parameter value for a filesystem.
"""
param_str = [s for s in os.pathconf_names if s.endswith(param_suffix)]
if len(param_str) == 1:
try:
return os.pathconf(file_system_path, param_str[0])
except OSError:
pass
return default

def max_file_name_length(file_system_path):
"""Return the maximum valid length of a filename (path component) on the given filesystem."""
return _get_pathconf(file_system_path, '_NAME_MAX', 80)-1

def string_to_file_name(string_value):
def max_path_length(file_system_path):
"""Return the maximum valid length of a path on the given filesystem."""
return _get_pathconf(file_system_path, '_PATH_MAX', 255)-1

def string_to_file_name(string_value, file_system_path=None, length_margin=0):
"""Constructs a valid file name from a given string, replacing or deleting invalid characters.
If `file_system_path` is given, makes sure the file name is valid on that file system.
If `length_margin` is given, ensure a string that long can be added to filename without breaking length limits.
"""
replacements_dict = {
"\\": "-", # win directory separator
"/": "-", # posix directory separator
os.sep: "-", # directory separator
"^": "_", # caret
"&": "_and_", # background
"\"": "", # double quotes
Expand Down Expand Up @@ -616,6 +651,16 @@ def string_to_file_name(string_value):
# remove leading or trailing periods (no hidden files (*NIX) or missing file extensions (NTFS))
string_value = string_value.strip(".")

# comply with file name length limits
if file_system_path is not None:
max_len = max(1, max_file_name_length(file_system_path) - length_margin)
string_value = string_value[:max_len]
while len(string_value.encode('utf-8')) > max_len:
string_value = string_value[:-1]

# ensure all the character removals did not make the name empty
string_value = string_value or '_'

return string_value

def grep_count(file_path, to_match, additional_flags=None, fixed_mode=True, starts_with=False):
Expand Down