Skip to content

Commit a531b10

Browse files
committed
Fixes open_mfdataset too many open file error
Includes testing to demonstrate an OSError associated with opening too many files as encountered using open_mfdataset. Fixed for the following backends: * netCDF4 backend * scipy backend * pynio backend Open/close operations on h5netcdf appear to have an error associated with the h5netcdf library following correspondence with @shoyer. Thus, there are still challenges with h5netcdf; hence, support for h5netcdf is currently disabled. Note, by default `autoclose=False` for open_mfdataset so standard behavior is unchanged unless `autoclose=True`. This choice of default is to select standard xarray performance over general removal of the OSError associated with opening too many files as encountered using open_mfdataset.
1 parent 93d6963 commit a531b10

File tree

11 files changed

+603
-203
lines changed

11 files changed

+603
-203
lines changed

doc/whats-new.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ v0.9.2 (unreleased)
2222
Enhancements
2323
~~~~~~~~~~~~
2424

25+
- It is now possible to set the ``autoclose=True`` argument to
26+
:py:func:`~xarray.open_mfdataset` to explicitly close opened files when not
27+
in use to prevent occurrence of an OS Error related to too many open files.
28+
Note, the default is ``autoclose=False``, which is consistent with previous
29+
xarray behavior. By `Phillip J. Wolfram <https://github.com/pwolfram>`_.
30+
2531
Bug fixes
2632
~~~~~~~~~
2733

xarray/backends/api.py

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import absolute_import
22
from __future__ import division
33
from __future__ import print_function
4-
import gzip
54
import os.path
65
from distutils.version import StrictVersion
76
from glob import glob
@@ -133,7 +132,7 @@ def _protect_dataset_variables_inplace(dataset, cache):
133132

134133

135134
def open_dataset(filename_or_obj, group=None, decode_cf=True,
136-
mask_and_scale=True, decode_times=True,
135+
mask_and_scale=True, decode_times=True, autoclose=False,
137136
concat_characters=True, decode_coords=True, engine=None,
138137
chunks=None, lock=None, cache=None, drop_variables=None):
139138
"""Load and decode a dataset from a file or file-like object.
@@ -163,6 +162,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
163162
decode_times : bool, optional
164163
If True, decode times encoded in the standard NetCDF datetime format
165164
into datetime objects. Otherwise, leave them encoded as numbers.
165+
autoclose : bool, optional
166+
If True, automatically close files to avoid OS Error of too many files
167+
being open. However, this option doesn't work with streams, e.g.,
168+
BytesIO.
166169
concat_characters : bool, optional
167170
If True, concatenate along the last dimension of character arrays to
168171
form string arrays. Dimensions will only be concatenated over (and
@@ -251,6 +254,12 @@ def maybe_decode_store(store, lock=False):
251254
else:
252255
ds2 = ds
253256

257+
# protect so that dataset store isn't necessarily closed, e.g.,
258+
# streams like BytesIO can't be reopened
259+
# datastore backend is responsible for determining this capability
260+
if store._autoclose:
261+
store.close()
262+
254263
return ds2
255264

256265
if isinstance(filename_or_obj, backends.AbstractDataStore):
@@ -270,34 +279,31 @@ def maybe_decode_store(store, lock=False):
270279
if filename_or_obj.endswith('.gz'):
271280
if engine is not None and engine != 'scipy':
272281
raise ValueError('can only read gzipped netCDF files with '
273-
"default engine or engine='scipy'")
274-
# if the string ends with .gz, then gunzip and open as netcdf file
275-
try:
276-
store = backends.ScipyDataStore(gzip.open(filename_or_obj))
277-
except TypeError as e:
278-
# TODO: gzipped loading only works with NetCDF3 files.
279-
if 'is not a valid NetCDF 3 file' in e.message:
280-
raise ValueError('gzipped file loading only supports '
281-
'NetCDF 3 files.')
282-
else:
283-
raise
284-
else:
285-
if engine is None:
286-
engine = _get_default_engine(filename_or_obj,
287-
allow_remote=True)
288-
if engine == 'netcdf4':
289-
store = backends.NetCDF4DataStore(filename_or_obj, group=group)
290-
elif engine == 'scipy':
291-
store = backends.ScipyDataStore(filename_or_obj)
292-
elif engine == 'pydap':
293-
store = backends.PydapDataStore(filename_or_obj)
294-
elif engine == 'h5netcdf':
295-
store = backends.H5NetCDFStore(filename_or_obj, group=group)
296-
elif engine == 'pynio':
297-
store = backends.NioDataStore(filename_or_obj)
282+
"engine='scipy'")
298283
else:
299-
raise ValueError('unrecognized engine for open_dataset: %r'
300-
% engine)
284+
engine = 'scipy'
285+
286+
if engine is None:
287+
engine = _get_default_engine(filename_or_obj,
288+
allow_remote=True)
289+
if engine == 'netcdf4':
290+
store = backends.NetCDF4DataStore(filename_or_obj, group=group,
291+
autoclose=autoclose)
292+
elif engine == 'scipy':
293+
store = backends.ScipyDataStore(filename_or_obj,
294+
autoclose=autoclose)
295+
elif engine == 'pydap':
296+
store = backends.PydapDataStore(filename_or_obj)
297+
elif engine == 'h5netcdf':
298+
store = backends.H5NetCDFStore(filename_or_obj, group=group,
299+
autoclose=autoclose)
300+
elif engine == 'pynio':
301+
store = backends.NioDataStore(filename_or_obj,
302+
autoclose=autoclose)
303+
else:
304+
raise ValueError('unrecognized engine for open_dataset: %r'
305+
% engine)
306+
301307
if lock is None:
302308
lock = _default_lock(filename_or_obj, engine)
303309
with close_on_error(store):
@@ -479,6 +485,10 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,
479485
Engine to use when reading files. If not provided, the default engine
480486
is chosen based on available dependencies, with a preference for
481487
'netcdf4'.
488+
autoclose : bool, optional
489+
If True, automatically close files to avoid OS Error of too many files
490+
being open. However, this option doesn't work with streams, e.g.,
491+
BytesIO.
482492
lock : False, True or threading.Lock, optional
483493
This argument is passed on to :py:func:`dask.array.from_array`. By
484494
default, a per-variable lock is used when reading data from netCDF

xarray/backends/common.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import time
77
import traceback
8+
import contextlib
89
from collections import Mapping
910
from distutils.version import StrictVersion
1011

@@ -40,6 +41,14 @@ def _decode_variable_name(name):
4041
name = None
4142
return name
4243

44+
def find_root(ds):
45+
"""
46+
Helper function to find the root of a netcdf or h5netcdf dataset.
47+
"""
48+
while ds.parent is not None:
49+
ds = ds.parent
50+
return ds
51+
4352

4453
def robust_getitem(array, key, catch=Exception, max_retries=6,
4554
initial_delay=500):
@@ -67,6 +76,7 @@ def robust_getitem(array, key, catch=Exception, max_retries=6,
6776

6877

6978
class AbstractDataStore(Mapping):
79+
_autoclose = False
7080

7181
def __iter__(self):
7282
return iter(self.variables)
@@ -107,8 +117,8 @@ def load(self):
107117
This function will be called anytime variables or attributes
108118
are requested, so care should be taken to make sure its fast.
109119
"""
110-
variables = FrozenOrderedDict((_decode_variable_name(k), v) for k, v in
111-
iteritems(self.get_variables()))
120+
variables = FrozenOrderedDict((_decode_variable_name(k), v)
121+
for k, v in self.get_variables().items())
112122
attributes = FrozenOrderedDict(self.get_attrs())
113123
return variables, attributes
114124

@@ -252,3 +262,26 @@ def __getstate__(self):
252262
def __setstate__(self, state):
253263
self.__dict__.update(state)
254264
self.ds = self._opener(mode=self._mode)
265+
266+
@contextlib.contextmanager
267+
def ensure_open(self, autoclose):
268+
"""
269+
Helper function to make sure datasets are closed and opened
270+
at appropriate times to avoid too many open file errors.
271+
272+
Use requires `autoclose=True` argument to `open_mfdataset`.
273+
"""
274+
if self._autoclose and not self._isopen:
275+
try:
276+
self.ds = self._opener()
277+
self._isopen = True
278+
yield
279+
finally:
280+
if autoclose:
281+
self.close()
282+
else:
283+
yield
284+
285+
def assert_open(self):
286+
if not self._isopen:
287+
raise AssertionError('internal failure: file must be open if `autoclose=True` is used.')

xarray/backends/h5netcdf_.py

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,27 @@
22
from __future__ import division
33
from __future__ import print_function
44
import functools
5+
import operator
56
import warnings
67

8+
import numpy as np
9+
710
from .. import Variable
811
from ..core import indexing
912
from ..core.utils import FrozenOrderedDict, close_on_error, Frozen
1013
from ..core.pycompat import iteritems, bytes_type, unicode_type, OrderedDict
1114

12-
from .common import WritableCFDataStore, DataStorePickleMixin
15+
from .common import WritableCFDataStore, DataStorePickleMixin, find_root
1316
from .netCDF4_ import (_nc4_group, _nc4_values_and_dtype,
1417
_extract_nc4_variable_encoding, BaseNetCDF4Array)
1518

1619

20+
class H5NetCDFFArrayWrapper(BaseNetCDF4Array):
21+
def __getitem__(self, key):
22+
with self.datastore.ensure_open(autoclose=True):
23+
return self.get_array()[key]
24+
25+
1726
def maybe_decode_bytes(txt):
1827
if isinstance(txt, bytes_type):
1928
return txt.decode('utf-8')
@@ -49,49 +58,62 @@ class H5NetCDFStore(WritableCFDataStore, DataStorePickleMixin):
4958
"""Store for reading and writing data via h5netcdf
5059
"""
5160
def __init__(self, filename, mode='r', format=None, group=None,
52-
writer=None):
61+
writer=None, autoclose=False):
5362
if format not in [None, 'NETCDF4']:
5463
raise ValueError('invalid format for h5netcdf backend')
5564
opener = functools.partial(_open_h5netcdf_group, filename, mode=mode,
5665
group=group)
5766
self.ds = opener()
67+
if autoclose:
68+
raise NotImplemented('autoclose=True is not implemented '
69+
'for the h5netcdf backend pending further '
70+
'exploration, e.g., bug fixes (in h5netcdf?)')
71+
self._autoclose = False
72+
self._isopen = True
5873
self.format = format
5974
self._opener = opener
6075
self._filename = filename
6176
self._mode = mode
6277
super(H5NetCDFStore, self).__init__(writer)
6378

6479
def open_store_variable(self, name, var):
65-
dimensions = var.dimensions
66-
data = indexing.LazilyIndexedArray(BaseNetCDF4Array(name, self))
67-
attrs = _read_attributes(var)
80+
with self.ensure_open(autoclose=False):
81+
dimensions = var.dimensions
82+
data = indexing.LazilyIndexedArray(
83+
H5NetCDFFArrayWrapper(name, self))
84+
attrs = _read_attributes(var)
6885

69-
# netCDF4 specific encoding
70-
encoding = dict(var.filters())
71-
chunking = var.chunking()
72-
encoding['chunksizes'] = chunking if chunking != 'contiguous' else None
86+
# netCDF4 specific encoding
87+
encoding = dict(var.filters())
88+
chunking = var.chunking()
89+
encoding['chunksizes'] = chunking if chunking != 'contiguous' else None
7390

74-
# save source so __repr__ can detect if it's local or not
75-
encoding['source'] = self._filename
76-
encoding['original_shape'] = var.shape
91+
# save source so __repr__ can detect if it's local or not
92+
encoding['source'] = self._filename
93+
encoding['original_shape'] = var.shape
7794

7895
return Variable(dimensions, data, attrs, encoding)
7996

8097
def get_variables(self):
81-
return FrozenOrderedDict((k, self.open_store_variable(k, v))
82-
for k, v in iteritems(self.ds.variables))
98+
with self.ensure_open(autoclose=False):
99+
return FrozenOrderedDict((k, self.open_store_variable(k, v))
100+
for k, v in iteritems(self.ds.variables))
83101

84102
def get_attrs(self):
85-
return Frozen(_read_attributes(self.ds))
103+
with self.ensure_open(autoclose=True):
104+
return FrozenOrderedDict(_read_attributes(self.ds))
86105

87106
def get_dimensions(self):
88-
return self.ds.dimensions
107+
with self.ensure_open(autoclose=True):
108+
return self.ds.dimensions
89109

90110
def set_dimension(self, name, length):
91-
self.ds.createDimension(name, size=length)
111+
with self.ensure_open(autoclose=False):
112+
self.ds.createDimension(name, size=length)
92113

93114
def set_attribute(self, key, value):
94-
self.ds.setncattr(key, value)
115+
with self.ensure_open(autoclose=False):
116+
self.ds.setncattr(key, value)
95117

96118
def prepare_variable(self, name, variable, check_encoding=False,
97119
unlimited_dims=None):
@@ -129,12 +151,14 @@ def prepare_variable(self, name, variable, check_encoding=False,
129151
return nc4_var, variable.data
130152

131153
def sync(self):
132-
super(H5NetCDFStore, self).sync()
133-
self.ds.sync()
154+
with self.ensure_open(autoclose=True):
155+
super(H5NetCDFStore, self).sync()
156+
self.ds.sync()
134157

135158
def close(self):
136-
ds = self.ds
137-
# netCDF4 only allows closing the root group
138-
while ds.parent is not None:
139-
ds = ds.parent
140-
ds.close()
159+
if self._isopen:
160+
# netCDF4 only allows closing the root group
161+
ds = find_root(self.ds)
162+
if not ds._closed:
163+
ds.close()
164+
self._isopen = False

0 commit comments

Comments
 (0)