diff --git a/lib/iris/fileformats/netcdf/loader.py b/lib/iris/fileformats/netcdf/loader.py index 113f40b3c9..64ed5cc634 100644 --- a/lib/iris/fileformats/netcdf/loader.py +++ b/lib/iris/fileformats/netcdf/loader.py @@ -190,34 +190,42 @@ def _get_cf_var_data(cf_var, filename): unnecessarily slow + wasteful of memory. """ - total_bytes = cf_var.size * cf_var.dtype.itemsize - if total_bytes < _LAZYVAR_MIN_BYTES: - # Don't make a lazy array, as it will cost more memory AND more time to access. - # Instead fetch the data immediately, as a real array, and return that. - result = cf_var[:] - + if hasattr(cf_var, "_data_array"): + # The variable is not an actual netCDF4 file variable, but an emulating + # object with an attached data array (either numpy or dask), which can be + # returned immediately as-is. This is used as a hook to translate data to/from + # netcdf data container objects in other packages, such as xarray. + # See https://github.com/SciTools/iris/issues/4994 "Xarray bridge". + result = cf_var._data_array else: - # Get lazy chunked data out of a cf variable. - dtype = _get_actual_dtype(cf_var) - - # Make a data-proxy that mimics array access and can fetch from the file. - fill_value = getattr( - cf_var.cf_data, - "_FillValue", - _thread_safe_nc.default_fillvals[cf_var.dtype.str[1:]], - ) - proxy = NetCDFDataProxy( - cf_var.shape, dtype, filename, cf_var.cf_name, fill_value - ) - # Get the chunking specified for the variable : this is either a shape, or - # maybe the string "contiguous". - chunks = cf_var.cf_data.chunking() - # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. - if chunks == "contiguous": - chunks = None - - # Return a dask array providing deferred access. - result = as_lazy_data(proxy, chunks=chunks) + total_bytes = cf_var.size * cf_var.dtype.itemsize + if total_bytes < _LAZYVAR_MIN_BYTES: + # Don't make a lazy array, as it will cost more memory AND more time to access. + # Instead fetch the data immediately, as a real array, and return that. + result = cf_var[:] + + else: + # Get lazy chunked data out of a cf variable. + dtype = _get_actual_dtype(cf_var) + + # Make a data-proxy that mimics array access and can fetch from the file. + fill_value = getattr( + cf_var.cf_data, + "_FillValue", + _thread_safe_nc.default_fillvals[cf_var.dtype.str[1:]], + ) + proxy = NetCDFDataProxy( + cf_var.shape, dtype, filename, cf_var.cf_name, fill_value + ) + # Get the chunking specified for the variable : this is either a shape, or + # maybe the string "contiguous". + chunks = cf_var.cf_data.chunking() + # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. + if chunks == "contiguous": + chunks = None + + # Return a dask array providing deferred access. + result = as_lazy_data(proxy, chunks=chunks) return result diff --git a/lib/iris/fileformats/netcdf/saver.py b/lib/iris/fileformats/netcdf/saver.py index 5c11d804db..1abcf44c9b 100644 --- a/lib/iris/fileformats/netcdf/saver.py +++ b/lib/iris/fileformats/netcdf/saver.py @@ -2345,71 +2345,88 @@ def _lazy_stream_data(self, data, fill_value, fill_warn, cf_var): # contains just 1 row, so the cf_var is 1D. data = data.squeeze(axis=0) - # Decide whether we are checking for fill-value collisions. - dtype = cf_var.dtype - # fill_warn allows us to skip warning if packing attributes have been - # specified. It would require much more complex operations to work out - # what the values and fill_value _would_ be in such a case. - if fill_warn: - if fill_value is not None: - fill_value_to_check = fill_value - else: - # Retain 'fill_value == None', to show that no specific value was given. - # But set 'fill_value_to_check' to a calculated value - fill_value_to_check = _thread_safe_nc.default_fillvals[ - dtype.str[1:] - ] - # Cast the check-value to the correct dtype. - # NOTE: In the case of 'S1' dtype (at least), the default (Python) value - # does not have a compatible type. This causes a deprecation warning at - # numpy 1.24, *and* was preventing correct fill-value checking of character - # data, since they are actually bytes (dtype 'S1'). - fill_value_to_check = np.array(fill_value_to_check, dtype=dtype) + if hasattr(cf_var, "_data_array"): + # The variable is not an actual netCDF4 file variable, but an emulating + # object with an attached data array (either numpy or dask), which should be + # copied immediately to the target. This is used as a hook to translate + # data to/from netcdf data container objects in other packages, such as + # xarray. + # See https://github.com/SciTools/iris/issues/4994 "Xarray bridge". + # N.B. also, in this case there is no need for fill-value checking as the + # data is not being translated to an in-file representation. + cf_var._data_array = data else: - # A None means we will NOT check for collisions. - fill_value_to_check = None - - fill_info = _FillvalueCheckInfo( - user_value=fill_value, - check_value=fill_value_to_check, - dtype=dtype, - varname=cf_var.name, - ) - - doing_delayed_save = is_lazy_data(data) - if doing_delayed_save: - # save lazy data with a delayed operation. For now, we just record the - # necessary information -- a single, complete delayed action is constructed - # later by a call to delayed_completion(). - def store(data, cf_var, fill_info): - # Create a data-writeable object that we can stream into, which - # encapsulates the file to be opened + variable to be written. - write_wrapper = _thread_safe_nc.NetCDFWriteProxy( - self.filepath, cf_var, self.file_write_lock + # Decide whether we are checking for fill-value collisions. + dtype = cf_var.dtype + # fill_warn allows us to skip warning if packing attributes have been + # specified. It would require much more complex operations to work out + # what the values and fill_value _would_ be in such a case. + if fill_warn: + if fill_value is not None: + fill_value_to_check = fill_value + else: + # Retain 'fill_value == None', to show that no specific value was given. + # But set 'fill_value_to_check' to a calculated value + fill_value_to_check = _thread_safe_nc.default_fillvals[ + dtype.str[1:] + ] + # Cast the check-value to the correct dtype. + # NOTE: In the case of 'S1' dtype (at least), the default (Python) value + # does not have a compatible type. This causes a deprecation warning at + # numpy 1.24, *and* was preventing correct fill-value checking of character + # data, since they are actually bytes (dtype 'S1'). + fill_value_to_check = np.array( + fill_value_to_check, dtype=dtype ) - # Add to the list of delayed writes, used in delayed_completion(). - self._delayed_writes.append((data, write_wrapper, fill_info)) - # In this case, fill-value checking is done later. But return 2 dummy - # values, to be consistent with the non-streamed "store" signature. - is_masked, contains_value = False, False - return is_masked, contains_value - - else: - # Real data is always written directly, i.e. not via lazy save. - # We also check it immediately for any fill-value problems. - def store(data, cf_var, fill_info): - cf_var[:] = data - return _data_fillvalue_check(np, data, fill_info.check_value) - - # Store the data and check if it is masked and contains the fill value. - is_masked, contains_fill_value = store(data, cf_var, fill_info) - - if not doing_delayed_save: - # Issue a fill-value warning immediately, if appropriate. - _fillvalue_report( - fill_info, is_masked, contains_fill_value, warn=True + else: + # A None means we will NOT check for collisions. + fill_value_to_check = None + + fill_info = _FillvalueCheckInfo( + user_value=fill_value, + check_value=fill_value_to_check, + dtype=dtype, + varname=cf_var.name, ) + doing_delayed_save = is_lazy_data(data) + if doing_delayed_save: + # save lazy data with a delayed operation. For now, we just record the + # necessary information -- a single, complete delayed action is constructed + # later by a call to delayed_completion(). + def store(data, cf_var, fill_info): + # Create a data-writeable object that we can stream into, which + # encapsulates the file to be opened + variable to be written. + write_wrapper = _thread_safe_nc.NetCDFWriteProxy( + self.filepath, cf_var, self.file_write_lock + ) + # Add to the list of delayed writes, used in delayed_completion(). + self._delayed_writes.append( + (data, write_wrapper, fill_info) + ) + # In this case, fill-value checking is done later. But return 2 dummy + # values, to be consistent with the non-streamed "store" signature. + is_masked, contains_value = False, False + return is_masked, contains_value + + else: + # Real data is always written directly, i.e. not via lazy save. + # We also check it immediately for any fill-value problems. + def store(data, cf_var, fill_info): + cf_var[:] = data + return _data_fillvalue_check( + np, data, fill_info.check_value + ) + + # Store the data and check if it is masked and contains the fill value. + is_masked, contains_fill_value = store(data, cf_var, fill_info) + + if not doing_delayed_save: + # Issue a fill-value warning immediately, if appropriate. + _fillvalue_report( + fill_info, is_masked, contains_fill_value, warn=True + ) + def delayed_completion(self) -> Delayed: """ Create and return a :class:`dask.delayed.Delayed` to perform file completion diff --git a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_ancil_var.py b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_ancil_var.py index 36ef025bfa..87070e00ba 100644 --- a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_ancil_var.py +++ b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_ancil_var.py @@ -15,6 +15,7 @@ from iris.exceptions import CannotAddError from iris.fileformats._nc_load_rules.helpers import build_ancil_var +from iris.fileformats.netcdf import _thread_safe_nc as threadsafe_nc @pytest.fixture @@ -31,6 +32,7 @@ def mock_engine(): def mock_cf_av_var(monkeypatch): data = np.arange(6) output = mock.Mock( + spec=threadsafe_nc.VariableWrapper, dimensions=("foo",), scale_factor=1, add_offset=0, diff --git a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_auxiliary_coordinate.py b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_auxiliary_coordinate.py index a0e105b795..369f92f238 100644 --- a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_auxiliary_coordinate.py +++ b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_auxiliary_coordinate.py @@ -22,6 +22,7 @@ from iris.exceptions import CannotAddError from iris.fileformats._nc_load_rules.helpers import build_auxiliary_coordinate from iris.fileformats.cf import CFVariable +from iris.fileformats.netcdf import _thread_safe_nc as threadsafe_nc class TestBoundsVertexDim(tests.IrisTest): @@ -238,6 +239,7 @@ def setUp(self): points = np.arange(6) self.cf_coord_var = mock.Mock( + spec=threadsafe_nc.VariableWrapper, dimensions=("foo",), scale_factor=1, add_offset=0, @@ -257,6 +259,7 @@ def setUp(self): bounds = np.arange(12).reshape(6, 2) self.cf_bounds_var = mock.Mock( + spec=threadsafe_nc.VariableWrapper, dimensions=("x", "nv"), scale_factor=1, add_offset=0, diff --git a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_cell_measure.py b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_cell_measure.py index 3fe64056bb..d0421186b4 100644 --- a/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_cell_measure.py +++ b/lib/iris/tests/unit/fileformats/nc_load_rules/helpers/test_build_cell_measure.py @@ -15,6 +15,7 @@ from iris.exceptions import CannotAddError from iris.fileformats._nc_load_rules.helpers import build_cell_measures +from iris.fileformats.netcdf import _thread_safe_nc as threadsafe_nc @pytest.fixture @@ -31,6 +32,7 @@ def mock_engine(): def mock_cf_cm_var(monkeypatch): data = np.arange(6) output = mock.Mock( + spec=threadsafe_nc.VariableWrapper, dimensions=("foo",), scale_factor=1, add_offset=0, diff --git a/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py b/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py index 7912648786..6c487d74e7 100644 --- a/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py +++ b/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py @@ -24,7 +24,9 @@ def setUp(self): self.shape = (300000, 240, 200) self.expected_chunks = _optimum_chunksize(self.shape, self.shape) - def _make(self, chunksizes=None, shape=None, dtype="i4"): + def _make( + self, chunksizes=None, shape=None, dtype="i4", **extra_properties + ): cf_data = mock.MagicMock( _FillValue=None, __getitem__="", @@ -40,6 +42,7 @@ def _make(self, chunksizes=None, shape=None, dtype="i4"): cf_name="DUMMY_VAR", shape=shape, size=np.prod(shape), + **extra_properties, ) cf_var.__getitem__.return_value = mock.sentinel.real_data_accessed return cf_var @@ -90,6 +93,15 @@ def test_arraytype__100f8_is_real(self): var_data = _get_cf_var_data(cf_var, self.filename) self.assertIs(var_data, mock.sentinel.real_data_accessed) + def test_cf_data_emulation(self): + # Check that a variable emulation object passes its real data directly. + emulated_data = mock.Mock() + # Make a cf_var with a special extra '_data_array' property. + cf_var = self._make(chunksizes=None, _data_array=emulated_data) + result = _get_cf_var_data(cf_var, self.filename) + # This should get directly returned. + self.assertIs(emulated_data, result) + if __name__ == "__main__": tests.main() diff --git a/lib/iris/tests/unit/fileformats/netcdf/saver/test_Saver__lazy_stream_data.py b/lib/iris/tests/unit/fileformats/netcdf/saver/test_Saver__lazy_stream_data.py index 6fa40a14fe..9686c88abf 100644 --- a/lib/iris/tests/unit/fileformats/netcdf/saver/test_Saver__lazy_stream_data.py +++ b/lib/iris/tests/unit/fileformats/netcdf/saver/test_Saver__lazy_stream_data.py @@ -18,7 +18,7 @@ import numpy as np import pytest -import iris.fileformats.netcdf._thread_safe_nc as nc_threadsafe +import iris.fileformats.netcdf._thread_safe_nc as threadsafe_nc from iris.fileformats.netcdf.saver import Saver, _FillvalueCheckInfo @@ -51,8 +51,8 @@ def compute(request) -> bool: # A fixture to parametrise tests over real and lazy-type data. @staticmethod - @pytest.fixture(params=[False, True], ids=["realdata", "lazydata"]) - def data_is_lazy(request) -> bool: + @pytest.fixture(params=["realdata", "lazydata", "emulateddata"]) + def data_form(request) -> bool: yield request.param @staticmethod @@ -63,35 +63,72 @@ def saver(compute) -> Saver: ) @staticmethod - def mock_var(shape): - # Create a test cf_var object - return mock.MagicMock(shape=tuple(shape), dtype=np.dtype(np.float32)) + def mock_var(shape, with_data_array): + # Create a test cf_var object. + # N.B. using 'spec=' so we can control whether it has a '_data_array' property. + if with_data_array: + extra_properties = { + "_data_array": mock.sentinel.initial_data_array + } + else: + extra_properties = {} + mock_cfvar = mock.MagicMock( + spec=threadsafe_nc.VariableWrapper, + shape=tuple(shape), + dtype=np.dtype(np.float32), + **extra_properties, + ) + # Give the mock cf-var a name property, as required by '_lazy_stream_data'. + # This *can't* be an extra kwarg to MagicMock __init__, since that already + # defines a specific 'name' kwarg, with a different purpose. + mock_cfvar.name = "" + return mock_cfvar - def test_data_save(self, compute, data_is_lazy): + def test_data_save(self, compute, data_form): """Real data is transferred immediately, lazy data creates a delayed write.""" saver = self.saver(compute=compute) + data = np.arange(5.0) - if data_is_lazy: + if data_form == "lazydata": data = da.from_array(data) + + cf_var = self.mock_var( + data.shape, with_data_array=(data_form == "emulateddata") + ) fill_value = -1.0 # not occurring in data - cf_var = self.mock_var(data.shape) saver._lazy_stream_data( data=data, fill_value=fill_value, fill_warn=True, cf_var=cf_var ) - assert cf_var.__setitem__.call_count == (0 if data_is_lazy else 1) - assert len(saver._delayed_writes) == (1 if data_is_lazy else 0) - if data_is_lazy: + if data_form == "lazydata": + expect_n_setitem = 0 + expect_n_delayed = 1 + elif data_form == "realdata": + expect_n_setitem = 1 + expect_n_delayed = 0 + else: + assert data_form == "emulateddata" + expect_n_setitem = 0 + expect_n_delayed = 0 + + assert cf_var.__setitem__.call_count == expect_n_setitem + assert len(saver._delayed_writes) == expect_n_delayed + + if data_form == "lazydata": result_data, result_writer, fill_info = saver._delayed_writes[0] assert result_data is data - assert isinstance(result_writer, nc_threadsafe.NetCDFWriteProxy) + assert isinstance(result_writer, threadsafe_nc.NetCDFWriteProxy) assert isinstance(fill_info, _FillvalueCheckInfo) - else: + elif data_form == "realdata": cf_var.__setitem__.assert_called_once_with(slice(None), data) + else: + assert data_form == "emulateddata" + cf_var._data_array == mock.sentinel.exact_data_array - def test_warnings(self, compute, data_is_lazy): + def test_warnings(self, compute, data_form): """ - For real data, fill-value warnings are issued immediately. For lazy data, - warnings are returned from computing a delayed completion. + For real data, fill-value warnings are issued immediately. + For lazy data, warnings are returned from computing a delayed completion. + For 'emulated' data (direct array transfer), no checks + no warnings ever. N.B. The 'compute' keyword has **no effect** on this : It only causes delayed writes to be automatically actioned on exiting a Saver context. @@ -99,11 +136,15 @@ def test_warnings(self, compute, data_is_lazy): to make dask distributed operation work. """ saver = self.saver(compute=compute) + data = np.arange(5.0) - if data_is_lazy: + if data_form == "lazydata": data = da.from_array(data) + fill_value = 2.0 # IS occurring in data - cf_var = self.mock_var(data.shape) + cf_var = self.mock_var( + data.shape, with_data_array=(data_form == "emulateddata") + ) # Do initial save. When compute=True, this issues warnings with warnings.catch_warnings(record=True) as logged_warnings: @@ -111,9 +152,16 @@ def test_warnings(self, compute, data_is_lazy): data=data, fill_value=fill_value, fill_warn=True, cf_var=cf_var ) + # Check warnings issued by initial call. issued_warnings = [log.message for log in logged_warnings] - - n_expected_warnings = 0 if data_is_lazy else 1 + if data_form == "lazydata": + n_expected_warnings = 0 + elif data_form == "realdata": + n_expected_warnings = 1 + else: + # No checks in the emulated case + assert data_form == "emulateddata" + n_expected_warnings = 0 assert len(issued_warnings) == n_expected_warnings # Complete the write : any delayed warnings should be *returned*. @@ -124,9 +172,16 @@ def test_warnings(self, compute, data_is_lazy): result2 = saver.delayed_completion().compute() issued_warnings += list(result2) - # Either way, a suitable warning should have been produced. - assert len(issued_warnings) == 1 - warning = issued_warnings[0] - msg = "contains unmasked data points equal to the fill-value, 2.0" - assert isinstance(warning, UserWarning) - assert msg in warning.args[0] + # Check warnings issued during 'completion'. + if data_form == "emulateddata": + # No checks in this case, ever. + n_expected_warnings = 0 + else: + # Otherwise, either way, a suitable warning should now have been produced. + n_expected_warnings = 1 + assert len(issued_warnings) == n_expected_warnings + if n_expected_warnings > 0: + warning = issued_warnings[0] + msg = "contains unmasked data points equal to the fill-value, 2.0" + assert isinstance(warning, UserWarning) + assert msg in warning.args[0]