Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close files when CachingFileManager is garbage collected #2595

Merged
merged 22 commits into from
Dec 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ Enhancements
- Like :py:class:`pandas.DatetimeIndex`, :py:class:`CFTimeIndex` now supports
"dayofyear" and "dayofweek" accessors (:issue:`2597`). By `Spencer Clark
<https://github.com/spencerkclark>`_.
- The option ``'warn_for_unclosed_files'`` (False by default) has been added to
allow users to enable a warning when files opened by xarray are deallocated
but were not explicitly closed. This is mostly useful for debugging; we
recommend enabling it in your test suites if you use xarray for IO.
By `Stephan Hoyer <https://github.com/shoyer>`_
- Support Dask ``HighLevelGraphs`` by `Matthew Rocklin <https://matthewrocklin.com>`_.
- :py:meth:`DataArray.resample` and :py:meth:`Dataset.resample` now supports the
``loffset`` kwarg just like Pandas.
Expand All @@ -68,6 +73,12 @@ Enhancements
Bug fixes
~~~~~~~~~

- Ensure files are automatically closed, if possible, when no longer referenced
by a Python variable (:issue:`2560`).
By `Stephan Hoyer <https://github.com/shoyer>`_
- Fixed possible race conditions when reading/writing to disk in parallel
(:issue:`2595`).
By `Stephan Hoyer <https://github.com/shoyer>`_
- Fix h5netcdf saving scalars with filters or chunks (:issue:`2563`).
By `Martin Raspaud <https://github.com/mraspaud>`_.
- Fix parsing of ``_Unsigned`` attribute set by OPENDAP servers. (:issue:`2583`).
Expand Down
124 changes: 104 additions & 20 deletions xarray/backends/file_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import contextlib
import threading
import warnings

from ..core import utils
from ..core.options import OPTIONS
from .locks import acquire
from .lru_cache import LRUCache


Expand All @@ -11,6 +14,8 @@
assert FILE_CACHE.maxsize, 'file cache must be at least size one'


REF_COUNTS = {}

_DEFAULT_MODE = utils.ReprObject('<unused>')


Expand All @@ -22,7 +27,7 @@ class FileManager(object):
many open files and transferring them between multiple processes.
"""

def acquire(self):
def acquire(self, needs_lock=True):
"""Acquire the file object from this manager."""
raise NotImplementedError

Expand Down Expand Up @@ -62,6 +67,9 @@ class CachingFileManager(FileManager):
def __init__(self, opener, *args, **keywords):
"""Initialize a FileManager.

The cache and ref_counts arguments exist solely to facilitate
dependency injection, and should only be set for tests.

Parameters
----------
opener : callable
Expand Down Expand Up @@ -90,13 +98,17 @@ def __init__(self, opener, *args, **keywords):
global variable and contains non-picklable file objects, an
unpickled FileManager objects will be restored with the default
cache.
ref_counts : dict, optional
Optional dict to use for keeping track the number of references to
the same file.
"""
# TODO: replace with real keyword arguments when we drop Python 2
# support
mode = keywords.pop('mode', _DEFAULT_MODE)
kwargs = keywords.pop('kwargs', None)
lock = keywords.pop('lock', None)
cache = keywords.pop('cache', FILE_CACHE)
ref_counts = keywords.pop('ref_counts', REF_COUNTS)
if keywords:
raise TypeError('FileManager() got unexpected keyword arguments: '
'%s' % list(keywords))
Expand All @@ -105,34 +117,52 @@ def __init__(self, opener, *args, **keywords):
self._args = args
self._mode = mode
self._kwargs = {} if kwargs is None else dict(kwargs)

self._default_lock = lock is None or lock is False
self._lock = threading.Lock() if self._default_lock else lock

# cache[self._key] stores the file associated with this object.
self._cache = cache
self._key = self._make_key()

# ref_counts[self._key] stores the number of CachingFileManager objects
# in memory referencing this same file. We use this to know if we can
# close a file when the manager is deallocated.
self._ref_counter = _RefCounter(ref_counts)
self._ref_counter.increment(self._key)

def _make_key(self):
"""Make a key for caching files in the LRU cache."""
value = (self._opener,
self._args,
self._mode,
'a' if self._mode == 'w' else self._mode,
tuple(sorted(self._kwargs.items())))
return _HashedSequence(value)

def acquire(self):
@contextlib.contextmanager
def _optional_lock(self, needs_lock):
"""Context manager for optionally acquiring a lock."""
if needs_lock:
with self._lock:
yield
else:
yield

def acquire(self, needs_lock=True):
"""Acquiring a file object from the manager.

A new file is only opened if it has expired from the
least-recently-used cache.

This method uses a reentrant lock, which ensures that it is
thread-safe. You can safely acquire a file in multiple threads at the
same time, as long as the underlying file object is thread-safe.
This method uses a lock, which ensures that it is thread-safe. You can
safely acquire a file in multiple threads at the same time, as long as
the underlying file object is thread-safe.

Returns
-------
An open file object, as returned by ``opener(*args, **kwargs)``.
"""
with self._lock:
with self._optional_lock(needs_lock):
try:
file = self._cache[self._key]
except KeyError:
Expand All @@ -144,28 +174,53 @@ def acquire(self):
if self._mode == 'w':
# ensure file doesn't get overriden when opened again
self._mode = 'a'
self._key = self._make_key()
self._cache[self._key] = file
return file

def _close(self):
default = None
file = self._cache.pop(self._key, default)
if file is not None:
file.close()

def close(self, needs_lock=True):
"""Explicitly close any associated file object (if necessary)."""
# TODO: remove needs_lock if/when we have a reentrant lock in
# dask.distributed: https://github.com/dask/dask/issues/3832
if needs_lock:
with self._lock:
self._close()
else:
self._close()
with self._optional_lock(needs_lock):
default = None
file = self._cache.pop(self._key, default)
if file is not None:
file.close()

def __del__(self):
# If we're the only CachingFileManger referencing a unclosed file, we
# should remove it from the cache upon garbage collection.
#
# Keeping our own count of file references might seem like overkill,
# but it's actually pretty common to reopen files with the same
# variable name in a notebook or command line environment, e.g., to
# fix the parameters used when opening a file:
# >>> ds = xarray.open_dataset('myfile.nc')
# >>> ds = xarray.open_dataset('myfile.nc', decode_times=False)
# This second assignment to "ds" drops CPython's ref-count on the first
# "ds" argument to zero, which can trigger garbage collections. So if
# we didn't check whether another object is referencing 'myfile.nc',
# the newly opened file would actually be immediately closed!
ref_count = self._ref_counter.decrement(self._key)

if not ref_count and self._key in self._cache:
if acquire(self._lock, blocking=False):
# Only close files if we can do so immediately.
try:
self.close(needs_lock=False)
finally:
self._lock.release()

if OPTIONS['warn_for_unclosed_files']:
warnings.warn(
'deallocating {}, but file is not already closed. '
'This may indicate a bug.'
.format(self), RuntimeWarning, stacklevel=2)

def __getstate__(self):
"""State for pickling."""
# cache and ref_counts are intentionally omitted: we don't want to try
# to serialize these global objects.
lock = None if self._default_lock else self._lock
return (self._opener, self._args, self._mode, self._kwargs, lock)

Expand All @@ -174,6 +229,34 @@ def __setstate__(self, state):
opener, args, mode, kwargs, lock = state
self.__init__(opener, *args, mode=mode, kwargs=kwargs, lock=lock)

def __repr__(self):
args_string = ', '.join(map(repr, self._args))
if self._mode is not _DEFAULT_MODE:
args_string += ', mode={!r}'.format(self._mode)
return '{}({!r}, {}, kwargs={})'.format(
type(self).__name__, self._opener, args_string, self._kwargs)


class _RefCounter(object):
"""Class for keeping track of reference counts."""
def __init__(self, counts):
self._counts = counts
self._lock = threading.Lock()

def increment(self, name):
with self._lock:
count = self._counts[name] = self._counts.get(name, 0) + 1
return count

def decrement(self, name):
with self._lock:
count = self._counts[name] - 1
if count:
self._counts[name] = count
else:
del self._counts[name]
return count


class _HashedSequence(list):
"""Speedup repeated look-ups by caching hash values.
Expand All @@ -198,7 +281,8 @@ class DummyFileManager(FileManager):
def __init__(self, value):
self._value = value

def acquire(self):
def acquire(self, needs_lock=True):
del needs_lock # ignored
return self._value

def close(self, needs_lock=True):
Expand Down
5 changes: 1 addition & 4 deletions xarray/backends/h5netcdf_.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def _getitem(self, key):
# h5py requires using lists for fancy indexing:
# https://github.com/h5py/h5py/issues/992
key = tuple(list(k) if isinstance(k, np.ndarray) else k for k in key)
array = self.get_array()
with self.datastore.lock:
array = self.get_array(needs_lock=False)
return array[key]


Expand Down Expand Up @@ -230,9 +230,6 @@ def prepare_variable(self, name, variable, check_encoding=False,

def sync(self):
self.ds.sync()
# if self.autoclose:
# self.close()
# super(H5NetCDFStore, self).sync(compute=compute)

def close(self, **kwargs):
self._manager.close(**kwargs)
47 changes: 33 additions & 14 deletions xarray/backends/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
# no need to worry about serializing the lock
SerializableLock = threading.Lock

try:
from dask.distributed import Lock as DistributedLock
except ImportError:
DistributedLock = None


# Locks used by multiple backends.
# Neither HDF5 nor the netCDF-C library are thread-safe.
Expand All @@ -33,16 +38,11 @@ def _get_multiprocessing_lock(key):
return multiprocessing.Lock()


def _get_distributed_lock(key):
from dask.distributed import Lock
return Lock(key)


_LOCK_MAKERS = {
None: _get_threaded_lock,
'threaded': _get_threaded_lock,
'multiprocessing': _get_multiprocessing_lock,
'distributed': _get_distributed_lock,
'distributed': DistributedLock,
}


Expand Down Expand Up @@ -113,6 +113,27 @@ def get_write_lock(key):
return lock_maker(key)


def acquire(lock, blocking=True):
"""Acquire a lock, possibly in a non-blocking fashion.

Includes backwards compatibility hacks for old versions of Python, dask
and dask-distributed.
"""
if blocking:
# no arguments needed
return lock.acquire()
elif DistributedLock is not None and isinstance(lock, DistributedLock):
# distributed.Lock doesn't support the blocking argument yet:
# https://github.com/dask/distributed/pull/2412
return lock.acquire(timeout=0)
else:
# "blocking" keyword argument not supported for:
# - threading.Lock on Python 2.
# - dask.SerializableLock with dask v1.0.0 or earlier.
# - multiprocessing.Lock calls the argument "block" instead.
return lock.acquire(blocking)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the date, do you think its useful here to consider this Python 2 work around? I guess its the same work-around as the dask/multiprocessing issue.



class CombinedLock(object):
"""A combination of multiple locks.

Expand All @@ -123,12 +144,12 @@ class CombinedLock(object):
def __init__(self, locks):
self.locks = tuple(set(locks)) # remove duplicates

def acquire(self, *args):
return all(lock.acquire(*args) for lock in self.locks)
def acquire(self, blocking=True):
return all(acquire(lock, blocking=blocking) for lock in self.locks)

def release(self, *args):
def release(self):
for lock in self.locks:
lock.release(*args)
lock.release()

def __enter__(self):
for lock in self.locks:
Expand All @@ -138,7 +159,6 @@ def __exit__(self, *args):
for lock in self.locks:
lock.__exit__(*args)

@property
def locked(self):
return any(lock.locked for lock in self.locks)

Expand All @@ -149,10 +169,10 @@ def __repr__(self):
class DummyLock(object):
"""DummyLock provides the lock API without any actual locking."""

def acquire(self, *args):
def acquire(self, blocking=True):
pass

def release(self, *args):
def release(self):
pass

def __enter__(self):
Expand All @@ -161,7 +181,6 @@ def __enter__(self):
def __exit__(self, *args):
pass

@property
def locked(self):
return False

Expand Down
Loading