Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions parcels/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ def __init__(self, name, data, lon=None, lat=None, depth=None, time=None, grid=N
# (data_full_zdim = grid.zdim if no indices are used, for A- and C-grids and for some B-grids). It is used for the B-grid,
# since some datasets do not provide the deeper level of data (which is ignored by the interpolation).
self.data_full_zdim = kwargs.pop('data_full_zdim', None)
self.data_chunks = []
self.c_data_chunks = []
self.data_chunks = [] # the data buffer of the FileBuffer raw loaded data - shall be a list of C-contiguous arrays
self.c_data_chunks = [] # C-pointers to the data_chunks array
self.nchunks = []
self.chunk_set = False
self.filebuffers = [None] * 3
Expand Down Expand Up @@ -990,7 +990,8 @@ def chunk_setup(self):

self.data_chunks = [None] * npartitions
self.c_data_chunks = [None] * npartitions
self.grid.load_chunk = np.zeros(npartitions, dtype=c_int)
# self.grid.load_chunk = np.zeros(npartitions, dtype=c_int)
self.grid.load_chunk = np.zeros(npartitions, dtype=c_int, order='C')
# self.grid.chunk_info format: number of dimensions (without tdim); number of chunks per dimensions;
# chunksizes (the 0th dim sizes for all chunk of dim[0], then so on for next dims
self.grid.chunk_info = [[len(self.nchunks)-1], list(self.nchunks[1:]), sum(list(list(ci) for ci in chunks[1:]), [])]
Expand All @@ -1009,7 +1010,8 @@ def chunk_data(self):
for block_id in range(len(self.grid.load_chunk)):
if self.grid.load_chunk[block_id] == 1 or self.grid.load_chunk[block_id] > 1 and self.data_chunks[block_id] is None:
block = self.get_block(block_id)
self.data_chunks[block_id] = np.array(self.data.blocks[(slice(self.grid.tdim),) + block])
# self.data_chunks[block_id] = np.array(self.data.blocks[(slice(self.grid.tdim),) + block])
self.data_chunks[block_id] = np.array(self.data.blocks[(slice(self.grid.tdim),) + block], order='C')
elif self.grid.load_chunk[block_id] == 0:
if isinstance(self.data_chunks, list):
self.data_chunks[block_id] = None
Expand All @@ -1022,8 +1024,10 @@ def chunk_data(self):
else:
self.data_chunks[0, :] = None
self.c_data_chunks[0] = None

self.grid.load_chunk[0] = 2
self.data_chunks[0] = np.array(self.data)
# self.data_chunks[0] = np.array(self.data)
self.data_chunks[0] = np.array(self.data, order='C')

@property
def ctypes_struct(self):
Expand All @@ -1046,8 +1050,9 @@ class CField(Structure):
if self.grid.load_chunk[i] == 1:
raise ValueError('data_chunks should have been loaded by now if requested. grid.load_chunk[bid] cannot be 1')
if self.grid.load_chunk[i] > 1:
if not self.data_chunks[i].flags.c_contiguous:
self.data_chunks[i] = self.data_chunks[i].copy()
if not self.data_chunks[i].flags['C_CONTIGUOUS']:
# self.data_chunks[i] = self.data_chunks[i].copy()
self.data_chunks[i] = np.array(self.data_chunks[i], order='C')
self.c_data_chunks[i] = self.data_chunks[i].ctypes.data_as(POINTER(POINTER(c_float)))
else:
self.c_data_chunks[i] = None
Expand Down
30 changes: 22 additions & 8 deletions parcels/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ class Grid(object):
"""

def __init__(self, lon, lat, time, time_origin, mesh):
self.lon = lon
self.lat = lat
self.time = np.zeros(1, dtype=np.float64) if time is None else time
self.lon = lon # should be a C-contiguous array of floats
if not self.lon.flags['C_CONTIGUOUS']:
self.lon = np.array(self.lon, order='C')
self.lat = lat # should be a C-contiguous array of floats
if not self.lat.flags['C_CONTIGUOUS']:
self.lat = np.array(self.lat, order='C')
self.time = np.zeros(1, dtype=np.float64) if time is None else time # should be a C-contiguous array of float64
if not self.time.flags['C_CONTIGUOUS']:
self.time = np.array(self.time, order='C')
if not self.lon.dtype == np.float32:
logger.warning_once("Casting lon data to np.float32")
self.lon = self.lon.astype(np.float32)
Expand All @@ -61,7 +67,7 @@ def __init__(self, lon, lat, time, time_origin, mesh):
self.defer_load = False
self.lonlat_minmax = np.array([np.nanmin(lon), np.nanmax(lon), np.nanmin(lat), np.nanmax(lat)], dtype=np.float32)
self.periods = 0
self.load_chunk = []
self.load_chunk = [] # should be a C-contiguous array of ints
self.chunk_info = None
self.master_chunksize = None
self._add_last_periodic_data_timestep = False
Expand Down Expand Up @@ -303,7 +309,9 @@ def __init__(self, lon, lat, depth=None, time=None, time_origin=None, mesh='flat
assert(len(depth.shape) == 1), 'depth is not a vector'

self.gtype = GridCode.RectilinearZGrid
self.depth = np.zeros(1, dtype=np.float32) if depth is None else depth
self.depth = np.zeros(1, dtype=np.float32) if depth is None else depth # should be a C-contiguous array of floats
if not self.depth.flags['C_CONTIGUOUS']:
self.depth = np.array(self.depth, order='C')
self.zdim = self.depth.size
self.z4d = -1 # only used in RectilinearSGrid
if not self.depth.dtype == np.float32:
Expand Down Expand Up @@ -339,7 +347,9 @@ def __init__(self, lon, lat, depth, time=None, time_origin=None, mesh='flat'):
assert(isinstance(depth, np.ndarray) and len(depth.shape) in [3, 4]), 'depth is not a 3D or 4D numpy array'

self.gtype = GridCode.RectilinearSGrid
self.depth = depth
self.depth = depth # should be a C-contiguous array of floats
if not self.depth.flags['C_CONTIGUOUS']:
self.depth = np.array(self.depth, order='C')
self.zdim = self.depth.shape[-3]
self.z4d = len(self.depth.shape) == 4
if self.z4d:
Expand Down Expand Up @@ -435,7 +445,9 @@ def __init__(self, lon, lat, depth=None, time=None, time_origin=None, mesh='flat
assert(len(depth.shape) == 1), 'depth is not a vector'

self.gtype = GridCode.CurvilinearZGrid
self.depth = np.zeros(1, dtype=np.float32) if depth is None else depth
self.depth = np.zeros(1, dtype=np.float32) if depth is None else depth # should be a C-contiguous array of floats
if not self.depth.flags['C_CONTIGUOUS']:
self.depth = np.array(self.depth, order='C')
self.zdim = self.depth.size
self.z4d = -1 # only for SGrid
if not self.depth.dtype == np.float32:
Expand Down Expand Up @@ -470,7 +482,9 @@ def __init__(self, lon, lat, depth, time=None, time_origin=None, mesh='flat'):
assert(isinstance(depth, np.ndarray) and len(depth.shape) in [3, 4]), 'depth is not a 4D numpy array'

self.gtype = GridCode.CurvilinearSGrid
self.depth = depth
self.depth = depth # should be a C-contiguous array of floats
if not self.depth.flags['C_CONTIGUOUS']:
self.depth = np.array(self.depth, order='C')
self.zdim = self.depth.shape[-3]
self.z4d = len(self.depth.shape) == 4
if self.z4d:
Expand Down
60 changes: 36 additions & 24 deletions parcels/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,17 @@ def execute_jit(self, pset, endtime, dt):
g.load_chunk = np.where(g.load_chunk == 1, 2, g.load_chunk)
if len(g.load_chunk) > 0: # not the case if a field in not called in the kernel
if not g.load_chunk.flags.c_contiguous:
g.load_chunk = g.load_chunk.copy()
# g.load_chunk = g.load_chunk.copy()
g.load_chunk = np.array(g.load_chunk, order='C')
if not g.depth.flags.c_contiguous:
g.depth = g.depth.copy()
# g.depth = g.depth.copy()
g.depth = np.array(g.depth, order='C')
if not g.lon.flags.c_contiguous:
g.lon = g.lon.copy()
# g.lon = g.lon.copy()
g.lon = np.array(g.lon, order='C')
if not g.lat.flags.c_contiguous:
g.lat = g.lat.copy()
# g.lat = g.lat.copy()
g.lat = np.array(g.lat, order='C')

fargs = [byref(f.ctypes_struct) for f in self.field_args.values()]
fargs += [c_double(f) for f in self.const_args.values()]
Expand Down Expand Up @@ -343,12 +347,15 @@ def execute_python(self, pset, endtime, dt):
break

def remove_deleted(self, pset, output_file, endtime):
"""Utility to remove all particles that signalled deletion"""
indices = [i for i, p in enumerate(pset.particles)
if p.state in [ErrorCode.Delete]]
if len(indices) > 0 and output_file is not None:
output_file.write(pset[indices], endtime, deleted_only=True)
pset.remove(indices)
"""Utility to remove all particles that signalled deletion"""
states = np.array([p.state for p in pset.particles])
delstates = states==int(ErrorCode.Delete)
indices = np.nonzero(delstates)
n_deleted = np.count_nonzero(delstates)
if n_deleted > 0 and output_file is not None:
output_file.write(pset[indices], endtime, deleted_only=True)
pset.remove(indices)


def execute(self, pset, endtime, dt, recovery=None, output_file=None, execute_once=False):
"""Execute this Kernel over a ParticleSet for several timesteps"""
Expand All @@ -358,14 +365,6 @@ def execute(self, pset, endtime, dt, recovery=None, output_file=None, execute_on
if abs(dt) < 1e-6 and not execute_once:
logger.warning_once("'dt' is too small, causing numerical accuracy limit problems. Please chose a higher 'dt' and rather scale the 'time' axis of the field accordingly. (related issue #762)")

# def remove_deleted(pset):
# """Utility to remove all particles that signalled deletion"""
# indices = [i for i, p in enumerate(pset.particles)
# if p.state in [ErrorCode.Delete]]
# if len(indices) > 0 and output_file is not None:
# output_file.write(pset[indices], endtime, deleted_only=True)
# pset.remove(indices)

if recovery is None:
recovery = {}
elif ErrorCode.ErrorOutOfBounds in recovery and ErrorCode.ErrorThroughSurface not in recovery:
Expand All @@ -387,17 +386,23 @@ def execute(self, pset, endtime, dt, recovery=None, output_file=None, execute_on
self.remove_deleted(pset, output_file=output_file, endtime=endtime)

# Identify particles that threw errors
error_particles = [p for p in pset.particles if p.state not in [ErrorCode.Success, ErrorCode.Evaluate]]

while len(error_particles) > 0:
states = np.array([p.state for p in pset.particles])
estates = np.zeros(states.shape, dtype=np.bool)
for ecval in {ErrorCode.Success, ErrorCode.Evaluate}:
estates |= states==int(ecval)
estates = np.invert(estates)
error_indices = np.nonzero(estates)
n_errors = np.count_nonzero(estates)
error_particles = pset[error_indices]

while n_errors > 0:
# Apply recovery kernel
for p in error_particles:
if p.state == ErrorCode.StopExecution:
return
if p.state == ErrorCode.Repeat:
p.reset_state()
elif p.state == ErrorCode.Delete:
# p.delete()
pass
elif p.state in recovery_map:
recovery_kernel = recovery_map[p.state]
Expand All @@ -407,7 +412,6 @@ def execute(self, pset, endtime, dt, recovery=None, output_file=None, execute_on
p.reset_state()
else:
logger.warning_once('Deleting particle {} because of non-recoverable error'.format(p.id))
# logger.warning('Deleting particle because of bug in #749 and #737 - particle state: {}'.format(ErrorCode.toString(p.state)))
p.delete()

# Remove all particles that signalled deletion
Expand All @@ -419,7 +423,15 @@ def execute(self, pset, endtime, dt, recovery=None, output_file=None, execute_on
else:
self.execute_python(pset, endtime, dt)

error_particles = [p for p in pset.particles if p.state not in [ErrorCode.Success, ErrorCode.Evaluate]]
# error_particles = [p for p in pset.particles if p.state not in [ErrorCode.Success, ErrorCode.Evaluate]]
states = np.array([p.state for p in pset.particles])
estates = np.zeros(states.shape, dtype=np.bool)
for ecval in {ErrorCode.Success, ErrorCode.Evaluate}:
estates |= states == int(ecval)
estates = np.invert(estates)
error_indices = np.nonzero(estates)
n_errors = np.count_nonzero(estates)
error_particles = pset[error_indices]

def merge(self, kernel):
funcname = self.funcname + kernel.funcname
Expand Down
12 changes: 8 additions & 4 deletions parcels/kernel_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,17 @@ def execute_jit(self, pset, endtime, dt):
g.load_chunk = np.where(g.load_chunk == 1, 2, g.load_chunk)
if len(g.load_chunk) > 0: # not the case if a field in not called in the kernel
if not g.load_chunk.flags.c_contiguous:
g.load_chunk = g.load_chunk.copy()
# g.load_chunk = g.load_chunk.copy()
g.load_chunk = np.array(g.load_chunk, order='C')
if not g.depth.flags.c_contiguous:
g.depth = g.depth.copy()
# g.depth = g.depth.copy()
g.depth = np.array(g.depth, order='C')
if not g.lon.flags.c_contiguous:
g.lon = g.lon.copy()
# g.lon = g.lon.copy()
g.lon = np.array(g.lon, order='C')
if not g.lat.flags.c_contiguous:
g.lat = g.lat.copy()
# g.lat = g.lat.copy()
g.lat = np.array(g.lat, order='C')

fargs = [byref(f.ctypes_struct) for f in self.field_args.values()]
fargs += [c_double(f) for f in self.const_args.values()]
Expand Down
2 changes: 1 addition & 1 deletion parcels/particleset_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def execute(self, pyfunc=AdvectionRK4, endtime=None, runtime=None, dt=1.,
# else:
# mem_B_used_total = self.process.memory_info().rss
# mem_B_used_total = self.process.memory_info().rss
mem_B_used_total = 0
# mem_B_used_total = 0
if USE_RUSE_SYNC_MEMLOG:
mem_B_used_total = measure_mem_usage()
else:
Expand Down
3 changes: 1 addition & 2 deletions performance/benchmark_CMEMS.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def perIterGC():
nowtime = datetime.now()
random.seed(nowtime.microsecond)

branch = "benchmarking"
branch = "engineer"
computer_env = "local/unspecified"
scenario = "CMEMS"
headdir = ""
Expand Down Expand Up @@ -366,7 +366,6 @@ def perIterGC():

if MPI:
mpi_comm = MPI.COMM_WORLD
# mpi_comm.Barrier()
Nparticles = mpi_comm.reduce(np.array(pset.nparticle_log.get_params()), op=MPI.SUM, root=0)
Nmem = mpi_comm.reduce(np.array(pset.mem_log.get_params()), op=MPI.SUM, root=0)
if mpi_comm.Get_rank() == 0:
Expand Down
2 changes: 1 addition & 1 deletion performance/benchmark_bickleyjet.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def Age(particle, fieldset, time):
nowtime = datetime.datetime.now()
random.seed(nowtime.microsecond)

branch = "benchmarking"
branch = "engineer"
computer_env = "local/unspecified"
scenario = "bickleyjet"
odir = ""
Expand Down
5 changes: 2 additions & 3 deletions performance/benchmark_deep_migration_NPacific.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def DeleteParticle(particle, fieldset, time):
def perIterGC():
gc.collect()

def getclosest_ij(lats,lons,latpt,lonpt):
def getclosest_ij(lats,lons,latpt,lonpt):
"""Function to find the index of the closest point to a certain lon/lat value."""
dist_sq = (lats-latpt)**2 + (lons-lonpt)**2 # find squared distance of every point on grid
minindex_flattened = dist_sq.argmin() # 1D index of minimum dist_sq element
Expand Down Expand Up @@ -240,7 +240,7 @@ def Profiles(particle, fieldset, time):
time_in_days = int(float(eval(args.time_in_days)))
with_GC = args.useGC

branch = "benchmarking"
branch = "engineer"
computer_env = "local/unspecified"
scenario = "deep_migration"
headdir = ""
Expand Down Expand Up @@ -442,7 +442,6 @@ def Profiles(particle, fieldset, time):

if MPI:
mpi_comm = MPI.COMM_WORLD
# mpi_comm.Barrier()
Nparticles = mpi_comm.reduce(np.array(pset.nparticle_log.get_params()), op=MPI.SUM, root=0)
Nmem = mpi_comm.reduce(np.array(pset.mem_log.get_params()), op=MPI.SUM, root=0)
if mpi_comm.Get_rank() == 0:
Expand Down
2 changes: 1 addition & 1 deletion performance/benchmark_doublegyre.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def Age(particle, fieldset, time):
nowtime = datetime.datetime.now()
random.seed(nowtime.microsecond)

branch = "benchmarking"
branch = "engineer"
computer_env = "local/unspecified"
scenario = "doublegyre"
odir = ""
Expand Down
3 changes: 1 addition & 2 deletions performance/benchmark_galapagos_backwards.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def periodicBC(particle, fieldSet, time):
time_in_days = int(float(eval(args.time_in_days)))
with_GC = args.useGC

branch = "benchmarking"
branch = "engineer"
computer_env = "local/unspecified"
scenario = "galapagos_backwards"
headdir = ""
Expand Down Expand Up @@ -208,7 +208,6 @@ def periodicBC(particle, fieldSet, time):

if MPI:
mpi_comm = MPI.COMM_WORLD
# mpi_comm.Barrier()
Nparticles = mpi_comm.reduce(np.array(pset.nparticle_log.get_params()), op=MPI.SUM, root=0)
Nmem = mpi_comm.reduce(np.array(pset.mem_log.get_params()), op=MPI.SUM, root=0)
if mpi_comm.Get_rank() == 0:
Expand Down
3 changes: 1 addition & 2 deletions performance/benchmark_palaeo_Y2K.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class DinoParticle(JITParticle):
time_in_years = int(time_in_days/366.0)
with_GC = args.useGC

branch = "benchmarking"
branch = "engineer"
computer_env = "local/unspecified"
scenario = "palaeo-parcels"
headdir = ""
Expand Down Expand Up @@ -437,7 +437,6 @@ class DinoParticle(JITParticle):

if MPI:
mpi_comm = MPI.COMM_WORLD
# mpi_comm.Barrier()
Nparticles = mpi_comm.reduce(np.array(pset.nparticle_log.get_params()), op=MPI.SUM, root=0)
Nmem = mpi_comm.reduce(np.array(pset.mem_log.get_params()), op=MPI.SUM, root=0)
if mpi_comm.Get_rank() == 0:
Expand Down
Loading