diff --git a/.github/workflows/ci-workflow.yml b/.github/workflows/ci-workflow.yml index 61d7d51e42..f3ce66d397 100644 --- a/.github/workflows/ci-workflow.yml +++ b/.github/workflows/ci-workflow.yml @@ -15,6 +15,7 @@ jobs: ./.github/scripts/linux_install.sh ./.github/scripts/osx_test.sh shell: bash + timeout-minutes: 45 test_linux: name: Testing on Linux runs-on: ubuntu-latest @@ -36,6 +37,7 @@ jobs: python -m flake8 parcels/ python -m flake8 tests/ shell: bash + timeout-minutes: 45 test_windows: name: Testing on Windows runs-on: windows-2016 @@ -52,4 +54,5 @@ jobs: ./.github/scripts/windows_install.bat ./.github/scripts/windows_test.bat shell: powershell + timeout-minutes: 45 diff --git a/parcels/examples/example_dask_chunk_OCMs.py b/parcels/examples/example_dask_chunk_OCMs.py new file mode 100644 index 0000000000..62a03284fc --- /dev/null +++ b/parcels/examples/example_dask_chunk_OCMs.py @@ -0,0 +1,415 @@ +import math +from datetime import timedelta as delta +from glob import glob +from os import path + +import numpy as np +import pytest +import dask + +from parcels import AdvectionRK4 +from parcels import Field +from parcels import FieldSet +from parcels import JITParticle +from parcels import ParticleFile +from parcels import ParticleSet +from parcels import ScipyParticle +from parcels import Variable + +ptype = {'scipy': ScipyParticle, 'jit': JITParticle} + + +def fieldset_from_nemo_3D(chunk_mode): + data_path = path.join(path.dirname(__file__), 'NemoNorthSeaORCA025-N006_data/') + ufiles = sorted(glob(data_path + 'ORCA*U.nc')) + vfiles = sorted(glob(data_path + 'ORCA*V.nc')) + wfiles = sorted(glob(data_path + 'ORCA*W.nc')) + mesh_mask = data_path + 'coordinates.nc' + + filenames = {'U': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': ufiles}, + 'V': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': vfiles}, + 'W': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': wfiles}} + variables = {'U': 'uo', + 'V': 'vo', + 'W': 'wo'} + dimensions = {'U': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}, + 'V': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}, + 'W': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}} + chs = False + if chunk_mode == 'auto': + chs = 'auto' + elif chunk_mode == 'specific': + chs = {'U': {'depthu': 75, 'depthv': 75, 'depthw': 75, 'y': 16, 'x': 16}, + 'V': {'depthu': 75, 'depthv': 75, 'depthw': 75, 'y': 16, 'x': 16}, + 'W': {'depthu': 75, 'depthv': 75, 'depthw': 75, 'y': 16, 'x': 16}} + + fieldset = FieldSet.from_nemo(filenames, variables, dimensions, field_chunksize=chs) + return fieldset + + +def fieldset_from_globcurrent(chunk_mode): + filenames = path.join(path.dirname(__file__), 'GlobCurrent_example_data', + '200201*-GLOBCURRENT-L4-CUReul_hs-ALT_SUM-v02.0-fv01.0.nc') + variables = {'U': 'eastward_eulerian_current_velocity', 'V': 'northward_eulerian_current_velocity'} + dimensions = {'lat': 'lat', 'lon': 'lon', 'time': 'time'} + chs = False + if chunk_mode == 'auto': + chs = 'auto' + elif chunk_mode == 'specific': + chs = {'U': {'lat': 16, 'lon': 16}, + 'V': {'lat': 16, 'lon': 16}} + + fieldset = FieldSet.from_netcdf(filenames, variables, dimensions, field_chunksize=chs) + return fieldset + + +def fieldset_from_pop_1arcs(chunk_mode): + filenames = path.join(path.join(path.dirname(__file__), 'POPSouthernOcean_data'), 't.x1_SAMOC_flux.1690*.nc') + variables = {'U': 'UVEL', 'V': 'VVEL', 'W': 'WVEL'} + timestamps = np.expand_dims(np.array([np.datetime64('2000-%.2d-01' % m) for m in range(1, 7)]), axis=1) + dimensions = {'lon': 'ULON', 'lat': 'ULAT', 'depth': 'w_dep'} + chs = False + if chunk_mode == 'auto': + chs = 'auto' + elif chunk_mode == 'specific': + chs = {'i': 8, 'j': 8, 'w_dep': 3} + + fieldset = FieldSet.from_pop(filenames, variables, dimensions, field_chunksize=chs, timestamps=timestamps) + return fieldset + + +def compute_nemo_particle_advection(field_set, mode, lonp, latp): + + def periodicBC(particle, fieldSet, time): + if particle.lon > 15.0: + particle.lon -= 15.0 + if particle.lon < 0: + particle.lon += 15.0 + if particle.lat > 60.0: + particle.lat -= 11.0 + if particle.lat < 49.0: + particle.lat += 11.0 + + pset = ParticleSet.from_list(field_set, ptype[mode], lon=lonp, lat=latp) + pfile = ParticleFile("nemo_particles_chunk", pset, outputdt=delta(days=1)) + kernels = pset.Kernel(AdvectionRK4) + periodicBC + pset.execute(kernels, runtime=delta(days=4), dt=delta(hours=6), output_file=pfile) + return pset + + +def compute_globcurrent_particle_advection(field_set, mode, lonp, latp): + pset = ParticleSet(field_set, pclass=ptype[mode], lon=lonp, lat=latp) + pfile = ParticleFile("globcurrent_particles_chunk", pset, outputdt=delta(hours=2)) + pset.execute(AdvectionRK4, runtime=delta(days=1), dt=delta(minutes=5), output_file=pfile) + return pset + + +def compute_pop_particle_advection(field_set, mode, lonp, latp): + pset = ParticleSet.from_list(field_set, ptype[mode], lon=lonp, lat=latp) + pfile = ParticleFile("globcurrent_particles_chunk", pset, outputdt=delta(days=15)) + pset.execute(AdvectionRK4, runtime=delta(days=90), dt=delta(days=2), output_file=pfile) + return pset + + +@pytest.mark.parametrize('mode', ['jit']) +@pytest.mark.parametrize('chunk_mode', [False, 'auto', 'specific']) +def test_nemo_3D(mode, chunk_mode): + if chunk_mode == 'auto': + dask.config.set({'array.chunk-size': '2MiB'}) + else: + dask.config.set({'array.chunk-size': '128MiB'}) + field_set = fieldset_from_nemo_3D(chunk_mode) + npart = 20 + lonp = 2.5 * np.ones(npart) + latp = [i for i in 52.0+(-1e-3+np.random.rand(npart)*2.0*1e-3)] + compute_nemo_particle_advection(field_set, mode, lonp, latp) + # Nemo sample file dimensions: depthu=75, y=201, x=151 + assert (len(field_set.U.grid.load_chunk) == len(field_set.V.grid.load_chunk)) + assert (len(field_set.U.grid.load_chunk) == len(field_set.W.grid.load_chunk)) + if chunk_mode is False: + assert (len(field_set.U.grid.load_chunk) == 1) + elif chunk_mode == 'auto': + assert (len(field_set.U.grid.load_chunk) != 1) + elif chunk_mode == 'specific': + assert (len(field_set.U.grid.load_chunk) == (1 * int(math.ceil(201.0/16.0)) * int(math.ceil(151.0/16.0)))) + + +@pytest.mark.parametrize('mode', ['jit']) +@pytest.mark.parametrize('chunk_mode', [False, 'auto', 'specific']) +def test_pop(mode, chunk_mode): + if chunk_mode == 'auto': + dask.config.set({'array.chunk-size': '1MiB'}) + else: + dask.config.set({'array.chunk-size': '128MiB'}) + field_set = fieldset_from_pop_1arcs(chunk_mode) + npart = 20 + lonp = 70.0 * np.ones(npart) + latp = [i for i in -45.0+(-0.25+np.random.rand(npart)*2.0*0.25)] + compute_pop_particle_advection(field_set, mode, lonp, latp) + # POP sample file dimensions: k=20, j=60, i=60 + assert (len(field_set.U.grid.load_chunk) == len(field_set.V.grid.load_chunk)) + assert (len(field_set.U.grid.load_chunk) == len(field_set.W.grid.load_chunk)) + if chunk_mode is False: + assert (len(field_set.U.grid.load_chunk) == 1) + elif chunk_mode == 'auto': + assert (len(field_set.U.grid.load_chunk) != 1) + elif chunk_mode == 'specific': + assert (len(field_set.U.grid.load_chunk) == (int(math.ceil(21.0/3.0)) * int(math.ceil(60.0/8.0)) * int(math.ceil(60.0/8.0)))) + + +@pytest.mark.parametrize('mode', ['jit']) +@pytest.mark.parametrize('chunk_mode', [False, 'auto', 'specific']) +def test_globcurrent_2D(mode, chunk_mode): + if chunk_mode == 'auto': + dask.config.set({'array.chunk-size': '32KiB'}) + else: + dask.config.set({'array.chunk-size': '128MiB'}) + field_set = fieldset_from_globcurrent(chunk_mode) + lonp = [25] + latp = [-35] + pset = compute_globcurrent_particle_advection(field_set, mode, lonp, latp) + # GlobCurrent sample file dimensions: time=UNLIMITED, lat=41, lon=81 + assert (len(field_set.U.grid.load_chunk) == len(field_set.V.grid.load_chunk)) + if chunk_mode is False: + assert (len(field_set.U.grid.load_chunk) == 1) + elif chunk_mode == 'auto': + assert (len(field_set.U.grid.load_chunk) != 1) + elif chunk_mode == 'specific': + assert (len(field_set.U.grid.load_chunk) == (1 * int(math.ceil(41.0/16.0)) * int(math.ceil(81.0/16.0)))) + assert(abs(pset[0].lon - 23.8) < 1) + assert(abs(pset[0].lat - -35.3) < 1) + + +@pytest.mark.parametrize('mode', ['jit']) +def test_diff_entry_dimensions_chunks(mode): + data_path = path.join(path.dirname(__file__), 'NemoNorthSeaORCA025-N006_data/') + ufiles = sorted(glob(data_path + 'ORCA*U.nc')) + vfiles = sorted(glob(data_path + 'ORCA*V.nc')) + mesh_mask = data_path + 'coordinates.nc' + + filenames = {'U': {'lon': mesh_mask, 'lat': mesh_mask, 'data': ufiles}, + 'V': {'lon': mesh_mask, 'lat': mesh_mask, 'data': vfiles}} + variables = {'U': 'uo', + 'V': 'vo'} + dimensions = {'U': {'lon': 'glamf', 'lat': 'gphif', 'time': 'time_counter'}, + 'V': {'lon': 'glamf', 'lat': 'gphif', 'time': 'time_counter'}} + chs = {'U': {'depthu': 75, 'depthv': 75, 'y': 16, 'x': 16}, + 'V': {'depthu': 75, 'depthv': 75, 'y': 16, 'x': 16}} + fieldset = FieldSet.from_nemo(filenames, variables, dimensions, field_chunksize=chs) + npart = 20 + lonp = 5.2 * np.ones(npart) + latp = [i for i in 52.0+(-1e-3+np.random.rand(npart)*2.0*1e-3)] + compute_nemo_particle_advection(fieldset, mode, lonp, latp) + # Nemo sample file dimensions: depthu=75, y=201, x=151 + assert (len(fieldset.U.grid.load_chunk) == len(fieldset.V.grid.load_chunk)) + + +@pytest.mark.parametrize('mode', ['scipy', 'jit']) +def test_3d_2dfield_sampling(mode): + data_path = path.join(path.dirname(__file__), 'NemoNorthSeaORCA025-N006_data/') + ufiles = sorted(glob(data_path + 'ORCA*U.nc')) + vfiles = sorted(glob(data_path + 'ORCA*V.nc')) + mesh_mask = data_path + 'coordinates.nc' + + filenames = {'U': {'lon': mesh_mask, 'lat': mesh_mask, 'data': ufiles}, + 'V': {'lon': mesh_mask, 'lat': mesh_mask, 'data': vfiles}, + # 'nav_lon': {'lon': mesh_mask, 'lat': mesh_mask, 'data': ufiles[0]}} + 'nav_lon': {'lon': mesh_mask, 'lat': mesh_mask, 'data': [ufiles[0], ]}} + variables = {'U': 'uo', + 'V': 'vo', + 'nav_lon': 'nav_lon'} + dimensions = {'U': {'lon': 'glamf', 'lat': 'gphif', 'time': 'time_counter'}, + 'V': {'lon': 'glamf', 'lat': 'gphif', 'time': 'time_counter'}, + 'nav_lon': {'lon': 'glamf', 'lat': 'gphif'}} + fieldset = FieldSet.from_nemo(filenames, variables, dimensions, field_chunksize=False) + fieldset.nav_lon.data = np.ones(fieldset.nav_lon.data.shape, dtype=np.float32) + fieldset.add_field(Field('rectilinear_2D', np.ones((2, 2)), + lon=np.array([-10, 20]), lat=np.array([40, 80]), field_chunksize=False)) + + class MyParticle(ptype[mode]): + sample_var_curvilinear = Variable('sample_var_curvilinear') + sample_var_rectilinear = Variable('sample_var_rectilinear') + pset = ParticleSet(fieldset, pclass=MyParticle, lon=2.5, lat=52) + + def Sample2D(particle, fieldset, time): + particle.sample_var_curvilinear += fieldset.nav_lon[time, particle.depth, particle.lat, particle.lon] + particle.sample_var_rectilinear += fieldset.rectilinear_2D[time, particle.depth, particle.lat, particle.lon] + + runtime, dt = 86400*4, 6*3600 + pset.execute(pset.Kernel(AdvectionRK4) + Sample2D, runtime=runtime, dt=dt) + + assert pset.sample_var_rectilinear == runtime/dt + assert pset.sample_var_curvilinear == runtime/dt + + +@pytest.mark.parametrize('mode', ['jit']) +def test_diff_entry_chunksize_error_nemo_simple(mode): + data_path = path.join(path.dirname(__file__), 'NemoNorthSeaORCA025-N006_data/') + ufiles = sorted(glob(data_path + 'ORCA*U.nc')) + vfiles = sorted(glob(data_path + 'ORCA*V.nc')) + wfiles = sorted(glob(data_path + 'ORCA*W.nc')) + mesh_mask = data_path + 'coordinates.nc' + + filenames = {'U': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': ufiles}, + 'V': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': vfiles}, + 'W': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': wfiles}} + variables = {'U': 'uo', + 'V': 'vo', + 'W': 'wo'} + dimensions = {'U': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}, + 'V': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}, + 'W': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}} + chs = {'U': {'depthu': 75, 'y': 16, 'x': 16}, + 'V': {'depthv': 20, 'y': 4, 'x': 16}, + 'W': {'depthw': 15, 'y': 16, 'x': 4}} + try: + fieldset = FieldSet.from_nemo(filenames, variables, dimensions, field_chunksize=chs) + except ValueError: + return True + npart = 20 + lonp = 5.2 * np.ones(npart) + latp = [i for i in 52.0+(-1e-3+np.random.rand(npart)*2.0*1e-3)] + compute_nemo_particle_advection(fieldset, mode, lonp, latp) + return False + + +@pytest.mark.parametrize('mode', ['jit']) +def test_diff_entry_chunksize_error_nemo_complex_conform_depth(mode): + # ==== this test is expected to fall-back to a pre-defined minimal chunk as ==== # + # ==== the requested chunks don't match, or throw a value error. ==== # + data_path = path.join(path.dirname(__file__), 'NemoNorthSeaORCA025-N006_data/') + ufiles = sorted(glob(data_path + 'ORCA*U.nc')) + vfiles = sorted(glob(data_path + 'ORCA*V.nc')) + wfiles = sorted(glob(data_path + 'ORCA*W.nc')) + mesh_mask = data_path + 'coordinates.nc' + + filenames = {'U': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': ufiles}, + 'V': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': vfiles}, + 'W': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': wfiles}} + variables = {'U': 'uo', + 'V': 'vo', + 'W': 'wo'} + dimensions = {'U': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}, + 'V': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}, + 'W': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}} + chs = {'U': {'depthu': 75, 'depthv': 75, 'depthw': 75, 'y': 16, 'x': 16}, + 'V': {'depthu': 75, 'depthv': 75, 'depthw': 75, 'y': 4, 'x': 16}, + 'W': {'depthu': 75, 'depthv': 75, 'depthw': 75, 'y': 16, 'x': 4}} + fieldset = FieldSet.from_nemo(filenames, variables, dimensions, field_chunksize=chs) + npart = 20 + lonp = 5.2 * np.ones(npart) + latp = [i for i in 52.0+(-1e-3+np.random.rand(npart)*2.0*1e-3)] + compute_nemo_particle_advection(fieldset, mode, lonp, latp) + # Nemo sample file dimensions: depthu=75, y=201, x=151 + npart_U = 1 + npart_U = [npart_U * k for k in fieldset.U.nchunks[1:]] + npart_V = 1 + npart_V = [npart_V * k for k in fieldset.V.nchunks[1:]] + npart_W = 1 + npart_W = [npart_W * k for k in fieldset.V.nchunks[1:]] + chn = {'U': {'lat': int(math.ceil(201.0/chs['U']['y'])), + 'lon': int(math.ceil(151.0/chs['U']['x'])), + 'depth': int(math.ceil(75.0/chs['U']['depthu']))}, + 'V': {'lat': int(math.ceil(201.0/chs['V']['y'])), + 'lon': int(math.ceil(151.0/chs['V']['x'])), + 'depth': int(math.ceil(75.0/chs['V']['depthv']))}, + 'W': {'lat': int(math.ceil(201.0/chs['W']['y'])), + 'lon': int(math.ceil(151.0/chs['W']['x'])), + 'depth': int(math.ceil(75.0/chs['W']['depthw']))}} + npart_U_request = 1 + npart_U_request = [npart_U_request * chn['U'][k] for k in chn['U']] + npart_V_request = 1 + npart_V_request = [npart_V_request * chn['V'][k] for k in chn['V']] + npart_W_request = 1 + npart_W_request = [npart_W_request * chn['W'][k] for k in chn['W']] + assert (len(fieldset.U.grid.load_chunk) == len(fieldset.V.grid.load_chunk)) + assert (len(fieldset.U.grid.load_chunk) == len(fieldset.W.grid.load_chunk)) + assert (npart_U == npart_V) + assert (npart_U == npart_W) + assert (npart_U != npart_U_request) + assert (npart_V != npart_V_request) + assert (npart_W != npart_W_request) + + +@pytest.mark.parametrize('mode', ['jit']) +def test_diff_entry_chunksize_error_nemo_complex_nonconform_depth(mode): + # ==== this test is expected to fall-back to a pre-defined minimal chunk as the requested chunks don't match, or throw a value error ==== # + data_path = path.join(path.dirname(__file__), 'NemoNorthSeaORCA025-N006_data/') + ufiles = sorted(glob(data_path + 'ORCA*U.nc')) + vfiles = sorted(glob(data_path + 'ORCA*V.nc')) + wfiles = sorted(glob(data_path + 'ORCA*W.nc')) + mesh_mask = data_path + 'coordinates.nc' + + filenames = {'U': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': ufiles}, + 'V': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': vfiles}} + variables = {'U': 'uo', + 'V': 'vo'} + dimensions = {'U': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}, + 'V': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}} + chs = {'U': {'depthu': 75, 'depthv': 15, 'y': 16, 'x': 16}, + 'V': {'depthu': 75, 'depthv': 15, 'y': 4, 'x': 16}} + fieldset = FieldSet.from_nemo(filenames, variables, dimensions, field_chunksize=chs) + npart = 20 + lonp = 5.2 * np.ones(npart) + latp = [i for i in 52.0+(-1e-3+np.random.rand(npart)*2.0*1e-3)] + try: + compute_nemo_particle_advection(fieldset, mode, lonp, latp) + except IndexError: + return True + return False + + +@pytest.mark.parametrize('mode', ['jit']) +def test_erroneous_fieldset_init(mode): + data_path = path.join(path.dirname(__file__), 'NemoNorthSeaORCA025-N006_data/') + ufiles = sorted(glob(data_path + 'ORCA*U.nc')) + vfiles = sorted(glob(data_path + 'ORCA*V.nc')) + wfiles = sorted(glob(data_path + 'ORCA*W.nc')) + mesh_mask = data_path + 'coordinates.nc' + + filenames = {'U': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': ufiles}, + 'V': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': vfiles}, + 'W': {'lon': mesh_mask, 'lat': mesh_mask, 'depth': wfiles[0], 'data': wfiles}} + variables = {'U': 'uo', + 'V': 'vo', + 'W': 'wo'} + dimensions = {'U': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}, + 'V': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}, + 'W': {'lon': 'glamf', 'lat': 'gphif', 'depth': 'depthw', 'time': 'time_counter'}} + chs = {'U': {'depthu': 75, 'y': 16, 'x': 16}, + 'V': {'depthv': 75, 'y': 16, 'x': 16}, + 'W': {'depthw': 75, 'y': 16, 'x': 16}} + + try: + FieldSet.from_nemo(filenames, variables, dimensions, field_chunksize=chs) + except ValueError: + return True + return False + + +@pytest.mark.parametrize('mode', ['jit']) +def test_diff_entry_chunksize_correction_globcurrent(mode): + filenames = path.join(path.dirname(__file__), 'GlobCurrent_example_data', + '200201*-GLOBCURRENT-L4-CUReul_hs-ALT_SUM-v02.0-fv01.0.nc') + variables = {'U': 'eastward_eulerian_current_velocity', 'V': 'northward_eulerian_current_velocity'} + dimensions = {'lat': 'lat', 'lon': 'lon', 'time': 'time'} + chs = {'U': {'lat': 16, 'lon': 16}, + 'V': {'lat': 16, 'lon': 4}} + fieldset = FieldSet.from_netcdf(filenames, variables, dimensions, field_chunksize=chs) + lonp = [25] + latp = [-35] + compute_globcurrent_particle_advection(fieldset, mode, lonp, latp) + # GlobCurrent sample file dimensions: time=UNLIMITED, lat=41, lon=81 + npart_U = 1 + npart_U = [npart_U * k for k in fieldset.U.nchunks[1:]] + npart_V = 1 + npart_V = [npart_V * k for k in fieldset.V.nchunks[1:]] + npart_V_request = 1 + chn = {'U': {'lat': int(math.ceil(41.0/chs['U']['lat'])), + 'lon': int(math.ceil(81.0/chs['U']['lon']))}, + 'V': {'lat': int(math.ceil(41.0/chs['V']['lat'])), + 'lon': int(math.ceil(81.0/chs['V']['lon']))}} + npart_V_request = [npart_V_request * chn['V'][k] for k in chn['V']] + assert (npart_U == npart_V) + assert (npart_V != npart_V_request) + assert (len(fieldset.U.grid.load_chunk) == len(fieldset.V.grid.load_chunk)) diff --git a/parcels/examples/example_nemo_curvilinear.py b/parcels/examples/example_nemo_curvilinear.py index 45b19058d3..a8510bc672 100644 --- a/parcels/examples/example_nemo_curvilinear.py +++ b/parcels/examples/example_nemo_curvilinear.py @@ -28,7 +28,7 @@ def run_nemo_curvilinear(mode, outfile): 'data': data_path + 'V_purely_zonal-ORCA025_grid_V.nc4'}} variables = {'U': 'U', 'V': 'V'} dimensions = {'lon': 'glamf', 'lat': 'gphif'} - field_chunksize = {'lon': 2, 'lat': 2} + field_chunksize = {'y': 2, 'x': 2} field_set = FieldSet.from_nemo(filenames, variables, dimensions, field_chunksize=field_chunksize) assert field_set.U.field_chunksize == field_chunksize diff --git a/parcels/field.py b/parcels/field.py index ff82f530ef..304b8fd3b6 100644 --- a/parcels/field.py +++ b/parcels/field.py @@ -30,6 +30,7 @@ from parcels.tools.error import FieldSamplingError from parcels.tools.error import TimeExtrapolationError from parcels.tools.loggers import logger +from netCDF4 import Dataset as ncDataset __all__ = ['Field', 'VectorField', 'SummedField', 'NestedField'] @@ -166,7 +167,7 @@ def __init__(self, name, data, lon=None, lat=None, depth=None, time=None, grid=N self.netcdf_engine = kwargs.pop('netcdf_engine', 'netcdf4') self.loaded_time_indices = [] self.creation_log = kwargs.pop('creation_log', '') - self.field_chunksize = kwargs.pop('field_chunksize', 'auto') + self.field_chunksize = kwargs.pop('field_chunksize', None) # data_full_zdim is the vertical dimension of the complete field data, ignoring the indices. # (data_full_zdim = grid.zdim if no indices are used, for A- and C-grids and for some B-grids). It is used for the B-grid, @@ -330,6 +331,40 @@ def from_netcdf(cls, filenames, variable, dimensions, indices=None, grid=None, grid = Grid.create_grid(lon, lat, depth, time, time_origin=time_origin, mesh=mesh) grid.timeslices = timeslices + if 'field_chunksize' in kwargs.keys() and grid.master_chunksize is None: + grid.master_chunksize = kwargs['field_chunksize'] + kwargs['dataFiles'] = dataFiles + elif grid is not None and ('dataFiles' not in kwargs or kwargs['dataFiles'] is None): + # ==== means: the field has a shared grid, but may have different data files, so we need to collect the + # ==== correct file time series again. + if timestamps is not None: + dataFiles = [] + for findex in range(len(data_filenames)): + for f in [data_filenames[findex], ] * len(timestamps[findex]): + dataFiles.append(f) + field_timeslices = np.array([stamp for file in timestamps for stamp in file]) + field_time = field_timeslices + else: + field_timeslices = [] + dataFiles = [] + for fname in data_filenames: + with NetcdfFileBuffer(fname, dimensions, indices, netcdf_engine, field_chunksize=False, lock_file=True) as filebuffer: + ftime = filebuffer.time + field_timeslices.append(ftime) + dataFiles.append([fname] * len(ftime)) + field_timeslices = np.array(field_timeslices) + field_time = np.concatenate(field_timeslices) + dataFiles = np.concatenate(np.array(dataFiles)) + if field_time.size == 1 and field_time[0] is None: + field_time[0] = 0 + time_origin = TimeConverter(field_time[0]) + field_time = time_origin.reltime(field_time) + if not np.all((field_time[1:]-field_time[:-1]) > 0): + id_not_ordered = np.where(field_time[1:] < field_time[:-1])[0][0] + raise AssertionError('Please make sure your netCDF files are ordered in time. First pair of non-ordered files: %s, %s' + % (dataFiles[id_not_ordered], dataFiles[id_not_ordered+1])) + if 'field_chunksize' in kwargs.keys() and grid.master_chunksize is None: + grid.master_chunksize = kwargs['field_chunksize'] kwargs['dataFiles'] = dataFiles if 'time' in indices: @@ -901,8 +936,6 @@ def get_block(self, bid): return np.unravel_index(bid, self.nchunks[1:]) def chunk_setup(self): - if self.chunk_set: - return if isinstance(self.data, da.core.Array): chunks = self.data.chunks self.nchunks = self.data.numblocks @@ -921,11 +954,9 @@ def chunk_setup(self): self.data_chunks = [None] * npartitions self.c_data_chunks = [None] * npartitions - self.grid.load_chunk = np.zeros(npartitions, dtype=c_int) - # self.grid.chunk_info format: number of dimensions (without tdim); number of chunks per dimensions; - # chunksizes (the 0th dim sizes for all chunk of dim[0], then so on for next dims + # chunksizes (the 0th dim sizes for all chunk of dim[0], then so on for next dims self.grid.chunk_info = [[len(self.nchunks)-1], list(self.nchunks[1:]), sum(list(list(ci) for ci in chunks[1:]), [])] self.grid.chunk_info = sum(self.grid.chunk_info, []) self.chunk_set = True @@ -1585,10 +1616,11 @@ def __getitem__(self, key): class NetcdfFileBuffer(object): - _name_maps = {'lon': ['lon', 'nav_lon', 'x', 'longitude', 'lo', 'ln'], - 'lat': ['lat', 'nav_lat', 'y', 'latitude', 'la', 'lt'], - 'depth': ['depth', 'depthu', 'depths', 'depthw', 'depthz', 'z', 'd'], + _name_maps = {'lon': ['lon', 'nav_lon', 'x', 'longitude', 'lo', 'ln', 'i'], + 'lat': ['lat', 'nav_lat', 'y', 'latitude', 'la', 'lt', 'j'], + 'depth': ['depth', 'depthu', 'depthv', 'depthw', 'depths', 'deptht', 'depthx', 'depthy', 'depthz', 'z', 'd', 'k', 'w_dep', 'w_deps'], 'time': ['time', 'time_count', 'time_counter', 'timer_count', 't']} + _min_dim_chunksize = 16 """ Class that encapsulates and manages deferred access to file data. """ def __init__(self, filename, dimensions, indices, netcdf_engine, timestamp=None, @@ -1622,7 +1654,8 @@ def __enter__(self): if self.field_chunksize not in [False, None]: init_chunk_dict = self._get_initial_chunk_dictionary() try: - # Unfortunately we need to do if-else here, cause the lock-parameter is either False or a Lock-object (we would rather want to have it auto-managed). + # Unfortunately we need to do if-else here, cause the lock-parameter is either False or a Lock-object + # (which we would rather want to have being auto-managed). # If 'lock' is not specified, the Lock-object is auto-created and managed bz xarray internally. if self.lock_file: self.dataset = xr.open_dataset(str(self.filename), decode_cf=True, engine=self.netcdf_engine, chunks=init_chunk_dict) @@ -1657,8 +1690,8 @@ def close(self): self.chunk_mapping = None def _get_initial_chunk_dictionary(self): - # ==== check-opening requested dataset to access metadata ==== # - # ==== opening the file for getting the dimensions does not require a decode - so don't even try. Save some computation ==== # + # ==== check-opening requested dataset to access metadata ==== # + # ==== file-opening and dimension-reading does not require a decode or lock ==== # self.dataset = xr.open_dataset(str(self.filename), decode_cf=False, engine=self.netcdf_engine, chunks={}, lock=False) self.dataset['decoded'] = False # ==== self.dataset temporarily available ==== # @@ -1666,38 +1699,59 @@ def _get_initial_chunk_dictionary(self): if isinstance(self.field_chunksize, dict): init_chunk_dict = self.field_chunksize elif isinstance(self.field_chunksize, tuple): # and (len(self.dimensions) == len(self.field_chunksize)): + tmp_chs = [0, ] * len(self.field_chunksize) chunk_index = len(self.field_chunksize)-1 + loni, lonname, _ = self._is_dimension_in_dataset('lon') - if loni >= 0: + if loni >= 0 and chunk_index >= 0: init_chunk_dict[lonname] = self.field_chunksize[chunk_index] - chunk_index -= 1 + tmp_chs[chunk_index] = self.field_chunksize[chunk_index] else: logger.warning_once(self._netcdf_DimNotFound_warning_message('lon')) + chunk_index -= 1 + lati, latname, _ = self._is_dimension_in_dataset('lat') - if lati >= 0: + if lati >= 0 and chunk_index >= 0: init_chunk_dict[latname] = self.field_chunksize[chunk_index] - chunk_index -= 1 + tmp_chs[chunk_index] = self.field_chunksize[chunk_index] else: logger.warning_once(self._netcdf_DimNotFound_warning_message('lat')) + chunk_index -= 1 + depthi, depthname, _ = self._is_dimension_in_dataset('depth') - if depthi >= 0: - init_chunk_dict[depthname] = self.field_chunksize[chunk_index] - chunk_index -= 1 + if depthi >= 0 and chunk_index >= 0: + if self._is_dimension_available('depth'): + init_chunk_dict[depthname] = self.field_chunksize[chunk_index] + tmp_chs[chunk_index] = self.field_chunksize[chunk_index] else: logger.warning_once(self._netcdf_DimNotFound_warning_message('depth')) + chunk_index -= 1 + timei, timename, _ = self._is_dimension_in_dataset('time') - if timei >= 0: - init_chunk_dict[timename] = self.field_chunksize[chunk_index] - chunk_index -= 1 + if timei >= 0 and chunk_index >= 0: + if self._is_dimension_available('time'): + init_chunk_dict[timename] = self.field_chunksize[chunk_index] + tmp_chs[chunk_index] = self.field_chunksize[chunk_index] else: logger.warning_once(self._netcdf_DimNotFound_warning_message('time')) + chunk_index -= 1 + + # ==== re-arrange the tupe and correct for empty dimensions ==== # + for chunk_index in range(len(self.field_chunksize)-1, -1, -1): + if tmp_chs[chunk_index] < 1: + tmp_chs.pop(chunk_index) + self.field_chunksize = tuple(tmp_chs) elif self.field_chunksize == 'auto': av_mem = psutil.virtual_memory().available chunk_cap = av_mem * (1/8) * (1/3) if 'array.chunk-size' in da_conf.config.keys(): chunk_cap = da_utils.parse_bytes(da_conf.config.get('array.chunk-size')) else: - logger.info_once("Unable to locate chunking hints from dask, thus estimating the max. chunk size heuristically. Please consider defining the 'chunk-size' for 'array' in your local dask configuration file (see http://oceanparcels.org/faq.html#field_chunking_config and https://docs.dask.org).") + predefined_cap = da_conf.get('array.chunk-size') + if predefined_cap is not None: + chunk_cap = da_utils.parse_bytes(predefined_cap) + else: + logger.info_once("Unable to locate chunking hints from dask, thus estimating the max. chunk size heuristically. Please consider defining the 'chunk-size' for 'array' in your local dask configuration file (see http://oceanparcels.org/faq.html#field_chunking_config and https://docs.dask.org).") loni, lonname, lonvalue = self._is_dimension_in_dataset('lon') lati, latname, latvalue = self._is_dimension_in_dataset('lat') if lati >= 0 and loni >= 0: @@ -1712,6 +1766,41 @@ def _get_initial_chunk_dictionary(self): init_chunk_dict[depthname] = max(1, depthvalue) # ==== closing check-opened requested dataset ==== # self.dataset.close() + # ==== check if the chunksize reading is successfull. if not, load the file ONCE really into memory and ==== # + # ==== deduce the chunking from the array dims. ==== # + try: + self.dataset = xr.open_dataset(str(self.filename), decode_cf=True, engine=self.netcdf_engine, chunks=init_chunk_dict, lock=False) + except: + # ==== fail - open it as a normal array and deduce the dimensions from the read field ==== # + init_chunk_dict = {} + self.dataset = ncDataset(str(self.filename)) + refdims = self.dataset.dimensions.keys() + max_field = "" + max_dim_names = () + max_overlay_dims = 0 + for vname in self.dataset.variables: + var = self.dataset.variables[vname] + overlay_dims = [] + for vdname in var.dimensions: + if vdname in refdims: + overlay_dims.append(vdname) + n_overlay_dims = len(overlay_dims) + if n_overlay_dims > max_overlay_dims: + max_field = vname + max_dim_names = tuple(overlay_dims) + max_overlay_dims = n_overlay_dims + self.name = max_field + for dname in max_dim_names: + if isinstance(self.field_chunksize, dict): + if dname in self.field_chunksize.keys(): + init_chunk_dict[dname] = min(self.field_chunksize[dname], self.dataset.dimensions[dname].size) + continue + init_chunk_dict[dname] = min(self._min_dim_chunksize, self.dataset.dimensions[dname].size) + # ==== because in this case it has shown that the requested field_chunksize setup cannot be used, ==== # + # ==== replace the requested field_chunksize with this auto-derived version. ==== # + self.field_chunksize = init_chunk_dict + finally: + self.dataset.close() self.dataset = None # ==== self.dataset not available ==== # return init_chunk_dict @@ -1719,7 +1808,7 @@ def _get_initial_chunk_dictionary(self): def _is_dimension_available(self, dimension_name): if self.dimensions is None or self.dataset is None: return False - return (dimension_name in self.dimensions and self.dimensions[dimension_name] in self.dataset.dims) + return dimension_name in self.dimensions def _is_dimension_in_dataset(self, dimension_name): k, dname, dvalue = (-1, '', 0) @@ -1753,24 +1842,27 @@ def _chunksize_to_chunkmap(self): if self.field_chunksize in [False, 'auto', None]: return self.chunk_mapping = {} - if (isinstance(self.field_chunksize, tuple)): + if isinstance(self.field_chunksize, tuple): for i in range(len(self.field_chunksize)): self.chunk_mapping[i] = self.field_chunksize[i] else: - # ====== 'time' is strictly excluded from the reading dimensions as it is implicitly organized with the data ====== # + timei, timename, timevalue = self._is_dimension_in_chunksize_request('time') + dtimei, dtimename, dtimevalue = self._is_dimension_in_dataset('time') depthi, depthname, depthvalue = self._is_dimension_in_chunksize_request('depth') + ddepthi, ddepthname, ddepthvalue = self._is_dimension_in_dataset('depth') lati, latname, latvalue = self._is_dimension_in_chunksize_request('lat') loni, lonname, lonvalue = self._is_dimension_in_chunksize_request('lon') dim_index = 0 if len(self.field_chunksize) == 2: - self.chunk_mapping[dim_index] = latvalue dim_index += 1 self.chunk_mapping[dim_index] = lonvalue dim_index += 1 elif len(self.field_chunksize) >= 3: - if depthi >= 0: - # self.chunk_mapping[dim_index] = self.field_chunksize[self.dimensions['depth']] + if timei >= 0 and timevalue > 1 and dtimei >= 0 and dtimevalue > 1 and self._is_dimension_available('time'): + self.chunk_mapping[dim_index] = 1 # still need to make sure that we only load 1 time step at a time + dim_index += 1 + if depthi >= 0 and depthvalue > 1 and ddepthi >= 0 and ddepthvalue > 1 and self._is_dimension_available('depth'): self.chunk_mapping[dim_index] = depthvalue dim_index += 1 self.chunk_mapping[dim_index] = latvalue @@ -1783,28 +1875,28 @@ def _chunkmap_to_chunksize(self): return self.field_chunksize = {} chunk_map = self.chunk_mapping - timei, _, timevalue = self._is_dimension_in_chunksize_request('time') - depthi, _, depthvalue = self._is_dimension_in_chunksize_request('depth') + timei, _, timevalue = self._is_dimension_in_dataset('time') + depthi, _, depthvalue = self._is_dimension_in_dataset('depth') if len(chunk_map) == 2: self.field_chunksize[self.dimensions['lat']] = chunk_map[0] self.field_chunksize[self.dimensions['lon']] = chunk_map[1] elif len(chunk_map) == 3: chunk_dim_index = 0 - if self._is_dimension_available('depth') or (depthi >= 0 and depthvalue > 1): + if depthi >= 0 and depthvalue > 1 and self._is_dimension_available('depth'): self.field_chunksize[self.dimensions['depth']] = chunk_map[chunk_dim_index] chunk_dim_index += 1 - elif self._is_dimension_available('time') or (timei >= 0 and timevalue > 1): + elif timei >= 0 and timevalue > 1 and self._is_dimension_available('time'): self.field_chunksize[self.dimensions['time']] = chunk_map[chunk_dim_index] chunk_dim_index += 1 self.field_chunksize[self.dimensions['lat']] = chunk_map[chunk_dim_index] chunk_dim_index += 1 self.field_chunksize[self.dimensions['lon']] = chunk_map[chunk_dim_index] elif len(chunk_map) >= 4: - self.field_chunksize[self.dimensions['time']] = chunk_map[-1] - self.field_chunksize[self.dimensions['depth']] = chunk_map[0] - self.field_chunksize[self.dimensions['lat']] = chunk_map[1] - self.field_chunksize[self.dimensions['lon']] = chunk_map[2] - dim_index = 3 + self.field_chunksize[self.dimensions['time']] = chunk_map[0] + self.field_chunksize[self.dimensions['depth']] = chunk_map[1] + self.field_chunksize[self.dimensions['lat']] = chunk_map[2] + self.field_chunksize[self.dimensions['lon']] = chunk_map[3] + dim_index = 4 for dim_name in self.dimensions: if dim_name not in ['time', 'depth', 'lat', 'lon']: self.field_chunksize[self.dimensions[dim_name]] = chunk_map[dim_index] @@ -1957,13 +2049,7 @@ def data_access(self): data = data.rechunk(self.field_chunksize) self.chunk_mapping = {} chunkIndex = 0 - timei, _, timevalue = self._is_dimension_in_dataset('time') - depthi, _, depthvalue = self._is_dimension_in_dataset('depth') - has_time = timei >= 0 and timevalue > 1 - has_depth = depthi >= 0 and depthvalue > 1 startblock = 0 - startblock += 1 if has_time and not self._is_dimension_available('time') else 0 - startblock += 1 if has_depth and not self._is_dimension_available('depth') else 0 for chunkDim in data.chunksize[startblock:]: self.chunk_mapping[chunkIndex] = chunkDim chunkIndex += 1 @@ -1976,12 +2062,14 @@ def data_access(self): data = data.rechunk(self.chunk_mapping) self.chunking_finalized = True else: - self.chunking_finalized = True da_data = da.from_array(data, chunks=self.field_chunksize) if self.field_chunksize == 'auto' and da_data.shape[-2:] == da_data.chunksize[-2:]: data = np.array(data) else: data = da_data + if not self.chunking_finalized and self.rechunk_callback_fields is not None: + self.rechunk_callback_fields() + self.chunking_finalized = True return data diff --git a/parcels/fieldset.py b/parcels/fieldset.py index 315085ff1d..454741d1d9 100644 --- a/parcels/fieldset.py +++ b/parcels/fieldset.py @@ -14,6 +14,7 @@ from parcels.tools.converters import TimeConverter, convert_xarray_time_units from parcels.tools.error import TimeExtrapolationError from parcels.tools.loggers import logger +import functools try: from mpi4py import MPI except: @@ -137,7 +138,65 @@ def add_field(self, field, name=None): raise NotImplementedError('FieldLists have been replaced by SummedFields. Use the + operator instead of []') else: setattr(self, name, field) + + if (isinstance(field.data, DeferredArray) or isinstance(field.data, da.core.Array)) and len(self.get_fields()) > 0: + # ==== check for inhabiting the same grid, and homogenise the grid chunking ==== # + g_set = field.grid + grid_chunksize = field.field_chunksize + dFiles = field.dataFiles + is_processed_grid = False + is_same_grid = False + for fld in self.get_fields(): # avoid re-processing/overwriting existing and working fields + if fld.grid == g_set: + is_processed_grid |= True + break + if not is_processed_grid: + for fld in self.get_fields(): + procdims = fld.dimensions + procinds = fld.indices + procpaths = fld.dataFiles + nowpaths = field.dataFiles + if procdims == field.dimensions and procinds == field.indices: + is_same_grid = False + if field.grid.mesh == fld.grid.mesh: + is_same_grid = True + else: + is_same_grid = True + for dim in ['lon', 'lat', 'depth', 'time']: + if dim in field.dimensions.keys() and dim in fld.dimensions.keys(): + is_same_grid &= (field.dimensions[dim] == fld.dimensions[dim]) + fld_g_dims = [fld.grid.tdim, fld.grid.zdim, fld.ydim, fld.xdim] + field_g_dims = [field.grid.tdim, field.grid.zdim, field.grid.ydim, field.grid.xdim] + for i in range(0, len(fld_g_dims)): + is_same_grid &= (field_g_dims[i] == fld_g_dims[i]) + if is_same_grid: + g_set = fld.grid + # ==== check here that the dims of field_chunksize are the same ==== # + if g_set.master_chunksize is not None: + res = False + if (isinstance(field.field_chunksize, tuple) and isinstance(g_set.master_chunksize, tuple)) or (isinstance(field.field_chunksize, dict) and isinstance(g_set.master_chunksize, dict)): + res |= functools.reduce(lambda i, j: i and j, map(lambda m, k: m == k, field.field_chunksize, g_set.master_chunksize), True) + else: + res |= (field.field_chunksize == g_set.master_chunksize) + if res: + grid_chunksize = g_set.master_chunksize + if field.grid.master_chunksize is not None: + logger.warning_once("Trying to initialize a shared grid with different chunking sizes - action prohibited. Replacing requested field_chunksize with grid's master chunksize.") + else: + raise ValueError("Conflict between grids of the same fieldset chunksize and requested field chunksize as well as the chunked name dimensions - Please apply the same chunksize to all fields in a shared grid!") + if procpaths == nowpaths: + dFiles = fld.dataFiles + break + if is_same_grid: + if field.grid != g_set: + field.grid = g_set + if field.field_chunksize != grid_chunksize: + field.field_chunksize = grid_chunksize + if field.dataFiles != dFiles: + field.dataFiles = dFiles + self.gridset.add_grid(field) + field.fieldset = self def add_vector_field(self, vfield): @@ -206,7 +265,8 @@ def parse_wildcards(cls, paths, filenames, var): @classmethod def from_netcdf(cls, filenames, variables, dimensions, indices=None, fieldtype=None, - mesh='spherical', timestamps=None, allow_time_extrapolation=None, time_periodic=False, deferred_load=True, **kwargs): + mesh='spherical', timestamps=None, allow_time_extrapolation=None, time_periodic=False, + deferred_load=True, field_chunksize='auto', **kwargs): """Initialises FieldSet object from NetCDF files :param filenames: Dictionary mapping variables to file(s). The @@ -274,31 +334,48 @@ def from_netcdf(cls, filenames, variables, dimensions, indices=None, fieldtype=N cls.checkvaliddimensionsdict(dims) inds = indices[var] if (indices and var in indices) else indices fieldtype = fieldtype[var] if (fieldtype and var in fieldtype) else fieldtype + chunksize = field_chunksize[var] if (field_chunksize and var in field_chunksize) else field_chunksize grid = None + grid_chunksize = chunksize + dFiles = None # check if grid has already been processed (i.e. if other fields have same filenames, dimensions and indices) for procvar, _ in fields.items(): procdims = dimensions[procvar] if procvar in dimensions else dimensions procinds = indices[procvar] if (indices and procvar in indices) else indices procpaths = filenames[procvar] if isinstance(filenames, dict) and procvar in filenames else filenames nowpaths = filenames[var] if isinstance(filenames, dict) and var in filenames else filenames - if procdims == dims and procinds == inds and procpaths == nowpaths: - sameGrid = False + if procdims == dims and procinds == inds: + processedGrid = False if ((not isinstance(filenames, dict)) or filenames[procvar] == filenames[var]): - sameGrid = True + processedGrid = True elif isinstance(filenames[procvar], dict): - sameGrid = True + processedGrid = True for dim in ['lon', 'lat', 'depth']: if dim in dimensions: - sameGrid *= filenames[procvar][dim] == filenames[var][dim] - if sameGrid: + processedGrid *= filenames[procvar][dim] == filenames[var][dim] + if processedGrid: grid = fields[procvar].grid - kwargs['dataFiles'] = fields[procvar].dataFiles - break + # ==== check here that the dims of field_chunksize are the same ==== # + if grid.master_chunksize is not None: + res = False + if (isinstance(chunksize, tuple) and isinstance(grid.master_chunksize, tuple)) or (isinstance(chunksize, dict) and isinstance(grid.master_chunksize, dict)): + res |= functools.reduce(lambda i, j: i and j, map(lambda m, k: m == k, chunksize, grid.master_chunksize), True) + else: + res |= (chunksize == grid.master_chunksize) + if res: + grid_chunksize = grid.master_chunksize + logger.warning_once("Trying to initialize a shared grid with different chunking sizes - action prohibited. Replacing requested field_chunksize with grid's master chunksize.") + else: + raise ValueError("Conflict between grids of the same fieldset chunksize and requested field chunksize as well as the chunked name dimensions - Please apply the same chunksize to all fields in a shared grid!") + if procpaths == nowpaths: + dFiles = fields[procvar].dataFiles + break fields[var] = Field.from_netcdf(paths, (var, name), dims, inds, grid=grid, mesh=mesh, timestamps=timestamps, allow_time_extrapolation=allow_time_extrapolation, time_periodic=time_periodic, deferred_load=deferred_load, - fieldtype=fieldtype, **kwargs) + fieldtype=fieldtype, field_chunksize=grid_chunksize, dataFiles=dFiles, **kwargs) + u = fields.pop('U', None) v = fields.pop('V', None) return cls(u, v, fields=fields) @@ -306,7 +383,7 @@ def from_netcdf(cls, filenames, variables, dimensions, indices=None, fieldtype=N @classmethod def from_nemo(cls, filenames, variables, dimensions, indices=None, mesh='spherical', allow_time_extrapolation=None, time_periodic=False, - tracer_interp_method='cgrid_tracer', **kwargs): + tracer_interp_method='cgrid_tracer', field_chunksize='auto', **kwargs): """Initialises FieldSet object from NetCDF files of Curvilinear NEMO fields. :param filenames: Dictionary mapping variables to file(s). The @@ -354,13 +431,15 @@ def from_nemo(cls, filenames, variables, dimensions, indices=None, mesh='spheric This flag overrides the allow_time_interpolation and sets it to False :param tracer_interp_method: Method for interpolation of tracer fields. It is recommended to use 'cgrid_tracer' (default) Note that in the case of from_nemo() and from_cgrid(), the velocity fields are default to 'cgrid_velocity' + :param field_chunksize: size of the chunks in dask loading """ if 'creation_log' not in kwargs.keys(): kwargs['creation_log'] = 'from_nemo' fieldset = cls.from_c_grid_dataset(filenames, variables, dimensions, mesh=mesh, indices=indices, time_periodic=time_periodic, - allow_time_extrapolation=allow_time_extrapolation, tracer_interp_method=tracer_interp_method, **kwargs) + allow_time_extrapolation=allow_time_extrapolation, tracer_interp_method=tracer_interp_method, + field_chunksize=field_chunksize, **kwargs) if hasattr(fieldset, 'W'): fieldset.W.set_scaling_factor(-1.) return fieldset @@ -368,7 +447,7 @@ def from_nemo(cls, filenames, variables, dimensions, indices=None, mesh='spheric @classmethod def from_c_grid_dataset(cls, filenames, variables, dimensions, indices=None, mesh='spherical', allow_time_extrapolation=None, time_periodic=False, - tracer_interp_method='cgrid_tracer', **kwargs): + tracer_interp_method='cgrid_tracer', field_chunksize='auto', **kwargs): """Initialises FieldSet object from NetCDF files of Curvilinear NEMO fields. :param filenames: Dictionary mapping variables to file(s). The @@ -416,6 +495,7 @@ def from_c_grid_dataset(cls, filenames, variables, dimensions, indices=None, mes This flag overrides the allow_time_interpolation and sets it to False :param tracer_interp_method: Method for interpolation of tracer fields. It is recommended to use 'cgrid_tracer' (default) Note that in the case of from_nemo() and from_cgrid(), the velocity fields are default to 'cgrid_velocity' + :param field_chunksize: size of the chunks in dask loading """ @@ -434,12 +514,13 @@ def from_c_grid_dataset(cls, filenames, variables, dimensions, indices=None, mes kwargs['creation_log'] = 'from_c_grid_dataset' return cls.from_netcdf(filenames, variables, dimensions, mesh=mesh, indices=indices, time_periodic=time_periodic, - allow_time_extrapolation=allow_time_extrapolation, interp_method=interp_method, **kwargs) + allow_time_extrapolation=allow_time_extrapolation, interp_method=interp_method, + field_chunksize=field_chunksize, **kwargs) @classmethod def from_pop(cls, filenames, variables, dimensions, indices=None, mesh='spherical', allow_time_extrapolation=None, time_periodic=False, - tracer_interp_method='bgrid_tracer', **kwargs): + tracer_interp_method='bgrid_tracer', field_chunksize='auto', **kwargs): """Initialises FieldSet object from NetCDF files of POP fields. It is assumed that the velocities in the POP fields is in cm/s. @@ -489,13 +570,15 @@ def from_pop(cls, filenames, variables, dimensions, indices=None, mesh='spherica This flag overrides the allow_time_interpolation and sets it to False :param tracer_interp_method: Method for interpolation of tracer fields. It is recommended to use 'bgrid_tracer' (default) Note that in the case of from_pop() and from_bgrid(), the velocity fields are default to 'bgrid_velocity' + :param field_chunksize: size of the chunks in dask loading """ if 'creation_log' not in kwargs.keys(): kwargs['creation_log'] = 'from_pop' fieldset = cls.from_b_grid_dataset(filenames, variables, dimensions, mesh=mesh, indices=indices, time_periodic=time_periodic, - allow_time_extrapolation=allow_time_extrapolation, tracer_interp_method=tracer_interp_method, **kwargs) + allow_time_extrapolation=allow_time_extrapolation, tracer_interp_method=tracer_interp_method, + field_chunksize=field_chunksize, **kwargs) if hasattr(fieldset, 'U'): fieldset.U.set_scaling_factor(0.01) # cm/s to m/s if hasattr(fieldset, 'V'): @@ -507,7 +590,7 @@ def from_pop(cls, filenames, variables, dimensions, indices=None, mesh='spherica @classmethod def from_b_grid_dataset(cls, filenames, variables, dimensions, indices=None, mesh='spherical', allow_time_extrapolation=None, time_periodic=False, - tracer_interp_method='bgrid_tracer', **kwargs): + tracer_interp_method='bgrid_tracer', field_chunksize='auto', **kwargs): """Initialises FieldSet object from NetCDF files of Bgrid fields. :param filenames: Dictionary mapping variables to file(s). The @@ -555,6 +638,7 @@ def from_b_grid_dataset(cls, filenames, variables, dimensions, indices=None, mes This flag overrides the allow_time_interpolation and sets it to False :param tracer_interp_method: Method for interpolation of tracer fields. It is recommended to use 'bgrid_tracer' (default) Note that in the case of from_pop() and from_bgrid(), the velocity fields are default to 'bgrid_velocity' + :param field_chunksize: size of the chunks in dask loading """ @@ -575,11 +659,13 @@ def from_b_grid_dataset(cls, filenames, variables, dimensions, indices=None, mes kwargs['creation_log'] = 'from_b_grid_dataset' return cls.from_netcdf(filenames, variables, dimensions, mesh=mesh, indices=indices, time_periodic=time_periodic, - allow_time_extrapolation=allow_time_extrapolation, interp_method=interp_method, **kwargs) + allow_time_extrapolation=allow_time_extrapolation, interp_method=interp_method, + field_chunksize=field_chunksize, **kwargs) @classmethod def from_parcels(cls, basename, uvar='vozocrtx', vvar='vomecrty', indices=None, extra_fields=None, - allow_time_extrapolation=None, time_periodic=False, deferred_load=True, **kwargs): + allow_time_extrapolation=None, time_periodic=False, deferred_load=True, + field_chunksize='auto', **kwargs): """Initialises FieldSet data from NetCDF files using the Parcels FieldSet.write() conventions. :param basename: Base name of the file(s); may contain @@ -600,6 +686,8 @@ def from_parcels(cls, basename, uvar='vozocrtx', vvar='vomecrty', indices=None, fully load them (default: True). It is advised to deferred load the data, since in that case Parcels deals with a better memory management during particle set execution. deferred_load=False is however sometimes necessary for plotting the fields. + :param field_chunksize: size of the chunks in dask loading + """ if extra_fields is None: @@ -618,7 +706,8 @@ def from_parcels(cls, basename, uvar='vozocrtx', vvar='vomecrty', indices=None, for v in extra_fields.keys()]) return cls.from_netcdf(filenames, indices=indices, variables=extra_fields, dimensions=dimensions, allow_time_extrapolation=allow_time_extrapolation, - time_periodic=time_periodic, deferred_load=deferred_load, **kwargs) + time_periodic=time_periodic, deferred_load=deferred_load, + field_chunksize=field_chunksize, **kwargs) @classmethod def from_xarray_dataset(cls, ds, variables, dimensions, mesh='spherical', allow_time_extrapolation=None, diff --git a/parcels/grid.py b/parcels/grid.py index 260042bea4..342fb3f445 100644 --- a/parcels/grid.py +++ b/parcels/grid.py @@ -63,6 +63,7 @@ def __init__(self, lon, lat, time, time_origin, mesh): self.periods = 0 self.load_chunk = [] self.chunk_info = None + self.master_chunksize = None self._add_last_periodic_data_timestep = False @staticmethod diff --git a/parcels/gridset.py b/parcels/gridset.py index 7d7b182199..c43bd31e43 100644 --- a/parcels/gridset.py +++ b/parcels/gridset.py @@ -1,4 +1,6 @@ import numpy as np +import functools +from parcels.tools.loggers import logger __all__ = ['GridSet'] @@ -15,20 +17,46 @@ def add_grid(self, field): grid = field.grid existing_grid = False for g in self.grids: + if field.field_chunksize != grid.master_chunksize: + logger.warning_once("Field chunksize and Grid master chunksize are not equal - erroneous behaviour expected.") + break + if g == grid: + existing_grid = True + break sameGrid = True + sameDims = True if grid.time_origin != g.time_origin: + sameDims = False continue for attr in ['lon', 'lat', 'depth', 'time']: gattr = getattr(g, attr) gridattr = getattr(grid, attr) if gattr.shape != gridattr.shape or not np.allclose(gattr, gridattr): sameGrid = False + sameDims = False break - if not sameGrid: + if not sameDims: continue - existing_grid = True - field.grid = g - break + sameGrid &= (grid.master_chunksize == g.master_chunksize) or (grid.master_chunksize in [False, None] and g.master_chunksize in [False, None]) + if not sameGrid and sameDims and grid.master_chunksize is not None: + print(field.field_chunksize) + print(grid.master_chunksize) + print(g.master_chunksize) + res = False + if (isinstance(grid.master_chunksize, tuple) and isinstance(g.master_chunksize, tuple)) or \ + (isinstance(grid.master_chunksize, dict) and isinstance(g.master_chunksize, dict)): + res |= functools.reduce(lambda i, j: i and j, + map(lambda m, k: m == k, grid.master_chunksize, g.master_chunksize), True) + if res: + sameGrid = True + logger.warning_once("Trying to initialize a shared grid with different chunking sizes - action prohibited. Replacing requested field_chunksize with grid's master chunksize.") + else: + raise ValueError("Conflict between grids of the same gridset: major grid chunksize and requested sibling-grid chunksize as well as their chunk-dimension names are not equal - Please apply the same chunksize to all fields in a shared grid!") + break + if sameGrid: + existing_grid = True + field.grid = g + break if not existing_grid: self.grids.append(grid) diff --git a/parcels/kernel.py b/parcels/kernel.py index 9ae2b58b40..e017167c27 100644 --- a/parcels/kernel.py +++ b/parcels/kernel.py @@ -273,7 +273,7 @@ def execute_python(self, pset, endtime, dt): # Don't execute particles that aren't started yet sign_end_part = np.sign(endtime - particles.time) - + # Compute min/max dt for first timestep dt_pos = min(abs(particles.dt), abs(endtime - particles.time)) # ==== numerically stable; also making sure that continuously-recovered particles do end successfully, diff --git a/parcels/particle.py b/parcels/particle.py index e7d1e5d379..e0d2c90fd8 100644 --- a/parcels/particle.py +++ b/parcels/particle.py @@ -7,9 +7,10 @@ from parcels.tools.error import ErrorCode from parcels.tools.loggers import logger - __all__ = ['ScipyParticle', 'JITParticle', 'Variable'] +indicators_64bit = [np.float64, np.int64, c_void_p] + class Variable(object): """Descriptor class that delegates data access to particle data @@ -47,8 +48,7 @@ def __repr__(self): def is64bit(self): """Check whether variable is 64-bit""" - return True if self.dtype == np.float64 or self.dtype == np.int64 \ - or self.dtype == c_void_p else False + return True if self.dtype in indicators_64bit else False class ParticleType(object): diff --git a/parcels/scripts/get_examples.py b/parcels/scripts/get_examples.py index de6437747f..a7aa0863ad 100644 --- a/parcels/scripts/get_examples.py +++ b/parcels/scripts/get_examples.py @@ -37,10 +37,13 @@ "U_purely_zonal-ORCA025_grid_U.nc4", "V_purely_zonal-ORCA025_grid_V.nc4", "mesh_mask.nc4"]] + ["NemoNorthSeaORCA025-N006_data/" + fn for fn in [ - "ORCA025-N06_20000104d05U.nc", "ORCA025-N06_20000109d05U.nc", - "ORCA025-N06_20000104d05V.nc", "ORCA025-N06_20000109d05V.nc", - "ORCA025-N06_20000104d05W.nc", "ORCA025-N06_20000109d05W.nc", + "ORCA025-N06_20000104d05U.nc", "ORCA025-N06_20000109d05U.nc", "ORCA025-N06_20000114d05U.nc", "ORCA025-N06_20000119d05U.nc", "ORCA025-N06_20000124d05U.nc", "ORCA025-N06_20000129d05U.nc", + "ORCA025-N06_20000104d05V.nc", "ORCA025-N06_20000109d05V.nc", "ORCA025-N06_20000114d05V.nc", "ORCA025-N06_20000119d05V.nc", "ORCA025-N06_20000124d05V.nc", "ORCA025-N06_20000129d05V.nc", + "ORCA025-N06_20000104d05W.nc", "ORCA025-N06_20000109d05W.nc", "ORCA025-N06_20000114d05W.nc", "ORCA025-N06_20000119d05W.nc", "ORCA025-N06_20000124d05W.nc", "ORCA025-N06_20000129d05W.nc", "coordinates.nc"]] + + ["POPSouthernOcean_data/" + fn for fn in ["t.x1_SAMOC_flux.169000.nc", "t.x1_SAMOC_flux.169001.nc", + "t.x1_SAMOC_flux.169002.nc", "t.x1_SAMOC_flux.169003.nc", + "t.x1_SAMOC_flux.169004.nc", "t.x1_SAMOC_flux.169005.nc"]] + ["WOA_data/" + fn for fn in ["woa18_decav_t%.2d_04.nc" % m for m in range(1, 13)]]) diff --git a/tests/test_fieldset.py b/tests/test_fieldset.py index b9c4728b31..baee8d53d7 100644 --- a/tests/test_fieldset.py +++ b/tests/test_fieldset.py @@ -2,6 +2,7 @@ from parcels.field import Field, VectorField from parcels.tools.converters import TimeConverter, _get_cftime_calendars, _get_cftime_datetimes, UnitConverter, GeographicPolar import dask.array as da +import dask from datetime import timedelta as delta import datetime import numpy as np @@ -239,6 +240,90 @@ def test_add_duplicate_field(dupobject): assert error_thrown +def test_fieldset_samegrids_from_file(tmpdir, filename='test_subsets'): + """ Test for subsetting fieldset from file using indices dict. """ + data, dimensions = generate_fieldset(100, 100) + filepath1 = tmpdir.join(filename+'_1') + fieldset1 = FieldSet.from_data(data, dimensions) + fieldset1.write(filepath1) + + ufiles = [filepath1+'U.nc', ] * 4 + vfiles = [filepath1+'V.nc', ] * 4 + timestamps = np.arange(0, 4, 1) * 86400.0 + timestamps = np.expand_dims(timestamps, 1) + files = {'U': ufiles, 'V': vfiles} + variables = {'U': 'vozocrtx', 'V': 'vomecrty'} + dimensions = {'lon': 'nav_lon', 'lat': 'nav_lat'} + chs = 'auto' + fieldset = FieldSet.from_netcdf(files, variables, dimensions, timestamps=timestamps, allow_time_extrapolation=True, field_chunksize=chs) + + assert fieldset.gridset.size == 1 + assert fieldset.U.grid == fieldset.V.grid + assert fieldset.U.grid.master_chunksize == fieldset.V.grid.master_chunksize + assert fieldset.U.field_chunksize == fieldset.V.field_chunksize + + +def test_fieldset_diffgrids_from_file(tmpdir, filename='test_subsets'): + """ Test for subsetting fieldset from file using indices dict. """ + data, dimensions = generate_fieldset(100, 100) + filepath1 = tmpdir.join(filename+'_1') + fieldset1 = FieldSet.from_data(data, dimensions) + fieldset1.write(filepath1) + data, dimensions = generate_fieldset(50, 50) + filepath2 = tmpdir.join(filename + '_2') + fieldset2 = FieldSet.from_data(data, dimensions) + fieldset2.write(filepath2) + + ufiles = [filepath1+'U.nc', ] * 4 + vfiles = [filepath2+'V.nc', ] * 4 + timestamps = np.arange(0, 4, 1) * 86400.0 + timestamps = np.expand_dims(timestamps, 1) + files = {'U': ufiles, 'V': vfiles} + variables = {'U': 'vozocrtx', 'V': 'vomecrty'} + dimensions = {'lon': 'nav_lon', 'lat': 'nav_lat'} + chs = 'auto' + + fieldset = FieldSet.from_netcdf(files, variables, dimensions, timestamps=timestamps, allow_time_extrapolation=True, field_chunksize=chs) + assert fieldset.gridset.size == 2 + assert fieldset.U.grid != fieldset.V.grid + + +def test_fieldset_diffgrids_from_file_data(tmpdir, filename='test_subsets'): + """ Test for subsetting fieldset from file using indices dict. """ + data, dimensions = generate_fieldset(100, 100) + filepath = tmpdir.join(filename) + fieldset_data = FieldSet.from_data(data, dimensions) + fieldset_data.write(filepath) + field_data = fieldset_data.U + field_data.name = "B" + + ufiles = [filepath+'U.nc', ] * 4 + vfiles = [filepath+'V.nc', ] * 4 + timestamps = np.arange(0, 4, 1) * 86400.0 + timestamps = np.expand_dims(timestamps, 1) + files = {'U': ufiles, 'V': vfiles} + variables = {'U': 'vozocrtx', 'V': 'vomecrty'} + dimensions = {'lon': 'nav_lon', 'lat': 'nav_lat'} + chs = 'auto' + fieldset_file = FieldSet.from_netcdf(files, variables, dimensions, timestamps=timestamps, allow_time_extrapolation=True, field_chunksize=chs) + + fieldset_file.add_field(field_data, "B") + assert len(fieldset_file.get_fields()) == 3 + assert fieldset_file.gridset.size == 2 + assert fieldset_file.U.grid != fieldset_file.B.grid + + +def test_fieldset_samegrids_from_data(tmpdir, filename='test_subsets'): + """ Test for subsetting fieldset from file using indices dict. """ + data, dimensions = generate_fieldset(100, 100) + fieldset1 = FieldSet.from_data(data, dimensions) + field_data = fieldset1.U + field_data.name = "B" + fieldset1.add_field(field_data, "B") + assert fieldset1.gridset.size == 1 + assert fieldset1.U.grid == fieldset1.B.grid + + @pytest.mark.parametrize('mesh', ['flat', 'spherical']) def test_fieldset_celledgesizes(mesh): data, dimensions = generate_fieldset(10, 7) @@ -354,6 +439,11 @@ def test_vector_fields(mode, swapUV): @pytest.mark.parametrize('field_chunksize', [False, 'auto', (1, 32, 32)]) @pytest.mark.parametrize('with_GC', [False, True]) def test_from_netcdf_memory_containment(mode, time_periodic, field_chunksize, with_GC): + if field_chunksize == 'auto': + dask.config.set({'array.chunk-size': '2MiB'}) + else: + dask.config.set({'array.chunk-size': '128MiB'}) + class PerformanceLog(): samples = [] memory_steps = [] @@ -432,7 +522,7 @@ def test_from_netcdf_field_chunking(mode, time_periodic, field_chunksize, deferL @pytest.mark.parametrize('datetype', ['float', 'datetime64']) -def test_timestaps(datetype, tmpdir): +def test_timestamps(datetype, tmpdir): data1, dims1 = generate_fieldset(10, 10, 1, 10) data2, dims2 = generate_fieldset(10, 10, 1, 4) if datetype == 'float':