Skip to content

Commit

Permalink
fixes for lint inspection issues
Browse files Browse the repository at this point in the history
  • Loading branch information
monocongo committed Mar 12, 2019
1 parent 7880fc2 commit 5d309ab
Showing 1 changed file with 82 additions and 46 deletions.
128 changes: 82 additions & 46 deletions climate_indices/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,8 @@ def drop_data_into_shared_arrays_grid(dataset,
periodicity,
data_start_year):

output_shape = None

# get the data arrays we'll use later in the index computations
global _global_shared_arrays
expected_dims_3d = (("lat", "lon", "time"), ("lon", "lat", "time"))
Expand Down Expand Up @@ -611,6 +613,8 @@ def drop_data_into_shared_arrays_grid(dataset,
def drop_data_into_shared_arrays_divisions(dataset,
var_names):

output_shape = None

# get the data arrays we'll use later in the index computations
global _global_shared_arrays
expected_dims_2d = [("division", "time"), ("time", "division")]
Expand Down Expand Up @@ -706,13 +710,11 @@ def _compute_write_index(keyword_arguments):
# the shape of output variables is assumed to match that of the input,
# so use either precipitation or temperature variable's shape
if "var_name_precip" in keyword_arguments:
output_shape = dataset[keyword_arguments["var_name_precip"]].shape
output_dims = dataset[keyword_arguments["var_name_precip"]].dims
elif "var_name_temp" in keyword_arguments:
output_shape = dataset[keyword_arguments["var_name_temp"]].shape
output_dims = dataset[keyword_arguments["var_name_temp"]].dims
else:
raise ValueError("Unable to determine output shape, no precipitation "
raise ValueError("Unable to determine output dimensions, no precipitation "
"or temperature variable name was specified.")

# convert data into the appropriate units, if necessary
Expand Down Expand Up @@ -774,22 +776,19 @@ def _compute_write_index(keyword_arguments):
if keyword_arguments["index"] == "palmers":

# read AWC data into shared memory array
if ("netcdf_awc" not in keyword_arguments) or (
"var_name_awc" not in keyword_arguments
):
if ("netcdf_awc" not in keyword_arguments) or \
("var_name_awc" not in keyword_arguments):
raise ValueError("Missing the AWC file and/or variable name argument(s)")

awc_dataset = xr.open_dataset(keyword_arguments["netcdf_awc"])

# create a shared memory array, wrap it as a numpy array and copy
# copy the data (values) from this variable's DataArray
var_name = keyword_arguments["var_name_awc"]
shared_array = multiprocessing.Array(
"d", int(np.prod(awc_dataset[var_name].shape))
)
shared_array_np = np.frombuffer(shared_array.get_obj()).reshape(
awc_dataset[var_name].shape
)
shared_array = \
multiprocessing.Array("d", int(np.prod(awc_dataset[var_name].shape)))
shared_array_np = \
np.frombuffer(shared_array.get_obj()).reshape(awc_dataset[var_name].shape)
np.copyto(shared_array_np, awc_dataset[var_name].values)

# add to the dictionary of arrays
Expand Down Expand Up @@ -835,7 +834,8 @@ def _compute_write_index(keyword_arguments):
"var_name_awc": keyword_arguments["var_name_awc"],
},
_KEY_RESULT_SCPDSI,
args,
input_type=input_type,
args=args,
)

# get the computed SCPDSI data as an array of float32 values
Expand Down Expand Up @@ -982,7 +982,8 @@ def _compute_write_index(keyword_arguments):
_global_shared_arrays,
{"var_name_precip": keyword_arguments["var_name_precip"]},
_KEY_RESULT,
args,
input_type=input_type,
args=args,
)

elif keyword_arguments["index"] == "spei":
Expand All @@ -996,7 +997,8 @@ def _compute_write_index(keyword_arguments):
"var_name_pet": keyword_arguments["var_name_pet"],
},
_KEY_RESULT,
args,
input_type=input_type,
args=args,
)

elif keyword_arguments["index"] == "pet":
Expand Down Expand Up @@ -1025,7 +1027,8 @@ def _compute_write_index(keyword_arguments):
"var_name_lat": _KEY_LAT,
},
_KEY_RESULT,
args,
input_type=input_type,
args=args,
)

else:
Expand Down Expand Up @@ -1147,14 +1150,15 @@ def _init_worker(shared_arrays_dict):


# ------------------------------------------------------------------------------
def _parallel_process(index, arrays_dict, input_var_names, output_var_name, args):
def _parallel_process(index, arrays_dict, input_var_names, output_var_name, input_type, args):
"""
TODO document this function
:param index:
:param arrays_dict:
:param input_var_names:
:param output_var_name:
:param str index:
:param dict arrays_dict:
:param dict input_var_names:
:param str output_var_name:
:param InputType input_type:
:param args:
:return:
"""
Expand Down Expand Up @@ -1185,6 +1189,7 @@ def _parallel_process(index, arrays_dict, input_var_names, output_var_name, args
"input_var_name": input_var_names["var_name_precip"],
"output_var_name": output_var_name,
"sub_array_start": split_indices[i],
"input_type": input_type,
"args": args,
}
if i < (_NUMBER_OF_WORKER_PROCESSES - 1):
Expand All @@ -1206,6 +1211,7 @@ def _parallel_process(index, arrays_dict, input_var_names, output_var_name, args
"var_name_pet": input_var_names["var_name_pet"],
"output_var_name": output_var_name,
"sub_array_start": split_indices[i],
"input_type": input_type,
"args": args,
}
if i < (_NUMBER_OF_WORKER_PROCESSES - 1):
Expand All @@ -1227,6 +1233,7 @@ def _parallel_process(index, arrays_dict, input_var_names, output_var_name, args
"var_name_lat": input_var_names["var_name_lat"],
"output_var_name": output_var_name,
"sub_array_start": split_indices[i],
"input_type": input_type,
"args": args,
}
if i < (_NUMBER_OF_WORKER_PROCESSES - 1):
Expand All @@ -1249,6 +1256,7 @@ def _parallel_process(index, arrays_dict, input_var_names, output_var_name, args
"var_name_awc": input_var_names["var_name_awc"],
"output_var_name": output_var_name,
"sub_array_start": split_indices[i],
"input_type": input_type,
"args": args,
}
if i < (_NUMBER_OF_WORKER_PROCESSES - 1):
Expand Down Expand Up @@ -1301,7 +1309,12 @@ def _apply_along_axis(params):
sub_array = np_array[start_index:end_index]
args = params["args"]

computed_array = np.apply_along_axis(func1d, axis=2, arr=sub_array, parameters=args)
if params["input_type"] == InputType.grid:
computed_array = np.apply_along_axis(func1d, axis=2, arr=sub_array, parameters=args)
elif params["input_type"] == InputType.divisions:
computed_array = np.apply_along_axis(func1d, axis=1, arr=sub_array, parameters=args)
else:
raise ValueError(f"Invalid input type argument: {params['input_type']}")

output_array = _global_shared_arrays[params["output_var_name"]][_KEY_ARRAY]
np_output_array = np.frombuffer(output_array.get_obj()).reshape(shape)
Expand Down Expand Up @@ -1356,11 +1369,14 @@ def _apply_along_axis_double(params):
]

for i, (x, y) in enumerate(zip(sub_array_1, sub_array_2)):
for j in range(x.shape[0]):
if params["index"] == "pet":
computed_array[i, j] = func1d(x[j], y, parameters=params["args"])
else:
computed_array[i, j] = func1d(x[j], y[j], parameters=params["args"])
if params["input_type"] == InputType.grid:
for j in range(x.shape[0]):
if params["index"] == "pet":
computed_array[i, j] = func1d(x[j], y, parameters=params["args"])
else:
computed_array[i, j] = func1d(x[j], y[j], parameters=params["args"])
else: # divisions
computed_array[i] = func1d(x, y, parameters=params["args"])


# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -1396,7 +1412,10 @@ def _apply_along_axis_palmers(params):
pet_np_array = np.frombuffer(pet_array.get_obj()).reshape(shape)
sub_array_pet = pet_np_array[start_index:end_index]
awc_array = _global_shared_arrays[awc_array_key][_KEY_ARRAY]
awc_np_array = np.frombuffer(awc_array.get_obj()).reshape([shape[0], shape[1]])
if params["input_type"] == InputType.grid:
awc_np_array = np.frombuffer(awc_array.get_obj()).reshape([shape[0], shape[1]])
else: # divisions
awc_np_array = np.frombuffer(awc_array.get_obj()).reshape(shape[0])
sub_array_awc = awc_np_array[start_index:end_index]

args = params["args"]
Expand Down Expand Up @@ -1427,12 +1446,17 @@ def _apply_along_axis_palmers(params):
start_index:end_index
]

for i, (precip, pet, awc) in enumerate(
zip(sub_array_precip, sub_array_pet, sub_array_awc)
):
for j in range(precip.shape[0]):
scpdsi[i, j], pdsi[i, j], phdi[i, j], pmdi[i, j], zindex[i, j] = func1d(
precip[j], pet[j], awc[j], parameters=args
for i, (precip, pet, awc) in enumerate(zip(sub_array_precip,
sub_array_pet,
sub_array_awc)):
if params["input_type"] == InputType.grid:
for j in range(precip.shape[0]):
scpdsi[i, j], pdsi[i, j], phdi[i, j], pmdi[i, j], zindex[i, j] = func1d(
precip[j], pet[j], awc[j], parameters=args
)
else: # divisions
scpdsi[i], pdsi[i], phdi[i], pmdi[i], zindex[i] = func1d(
precip, pet, awc, parameters=args
)


Expand All @@ -1450,18 +1474,25 @@ def _prepare_file(netcdf_file, var_name):

# make sure we have lat, lon, and time as variable dimensions, regardless of order
ds = xr.open_dataset(netcdf_file)
if len(ds[var_name].dims) == 2:
expected_dims = ("lat", "lon")
dims = "lat,lon"
elif len(ds[var_name].dims) == 3:
expected_dims = ("lat", "lon", "time")
dims = "lat,lon,time"
else:
raise ValueError(
"Unsupported dimensions for variable '{var_name}': {dims}".format(
var_name=var_name, dims=ds[var_name].dims
)
)
dimensions = ds[var_name].dims
if "division" in dimensions:
if len(dimensions) == 1:
expected_dims = ("division",)
dims = "division"
elif len(dimensions) == 2:
expected_dims = ("division", "time")
dims = "division,time"
else:
raise ValueError(f"Unsupported dimensions for variable '{var_name}': {dimensions}")
else: # gridded
if len(dimensions) == 2:
expected_dims = ("lat", "lon")
dims = "lat,lon"
elif len(dimensions) == 3:
expected_dims = ("lat", "lon", "time")
dims = "lat,lon,time"
else:
raise ValueError(f"Unsupported dimensions for variable '{var_name}': {dimensions}")

if Counter(ds[var_name].dims) != Counter(expected_dims):
message = "Invalid dimensions for variable '{var_name}': {dims}".format(
Expand Down Expand Up @@ -1618,6 +1649,7 @@ def main(): # type: () -> None
"index": "spi",
"netcdf_precip": netcdf_precip,
"var_name_precip": arguments.var_name_precip,
"input_type": input_type,
"scale": scale,
"distribution": dist,
"periodicity": arguments.periodicity,
Expand Down Expand Up @@ -1647,6 +1679,7 @@ def main(): # type: () -> None
# keyword arguments used for the PET function
kwargs = {
"index": "pet",
"input_type": input_type,
"netcdf_temp": netcdf_temp,
"var_name_temp": arguments.var_name_temp,
"output_file_base": arguments.output_file_base,
Expand Down Expand Up @@ -1680,6 +1713,7 @@ def main(): # type: () -> None
"var_name_precip": arguments.var_name_precip,
"netcdf_pet": netcdf_pet,
"var_name_pet": arguments.var_name_pet,
"input_type": input_type,
"scale": scale,
"distribution": dist,
"periodicity": arguments.periodicity,
Expand Down Expand Up @@ -1712,6 +1746,7 @@ def main(): # type: () -> None
"index": "pnp",
"netcdf_precip": netcdf_precip,
"var_name_precip": arguments.var_name_precip,
"input_type": input_type,
"scale": scale,
"periodicity": arguments.periodicity,
"calibration_start_year": arguments.calibration_start_year,
Expand Down Expand Up @@ -1744,6 +1779,7 @@ def main(): # type: () -> None
"var_name_pet": arguments.var_name_pet,
"netcdf_awc": netcdf_awc,
"var_name_awc": arguments.var_name_awc,
"input_type": input_type,
"calibration_start_year": arguments.calibration_start_year,
"calibration_end_year": arguments.calibration_end_year,
"output_file_base": arguments.output_file_base,
Expand Down

0 comments on commit 5d309ab

Please sign in to comment.