From f7ddbfdb6b0b7db7a8c0904227ea9328ca8783df Mon Sep 17 00:00:00 2001 From: Huite Date: Wed, 26 Mar 2025 15:06:04 +0100 Subject: [PATCH 1/4] Define name for fill value (clearer that it's a fill value) --- imod/mf6/disv.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/imod/mf6/disv.py b/imod/mf6/disv.py index bf3175cce..ba3910e91 100644 --- a/imod/mf6/disv.py +++ b/imod/mf6/disv.py @@ -131,18 +131,19 @@ def _verts_dataframe(self) -> pd.DataFrame: return df def _cell2d_dataframe(self) -> pd.DataFrame: + XUGRID_FILL = -1 grid = self.dataset.ugrid.grid df = pd.DataFrame(grid.face_coordinates) df.index += 1 # modflow requires clockwise; ugrid requires ccw face_nodes = grid.face_node_connectivity[:, ::-1] - df[2] = (face_nodes != grid.fill_value).sum(axis=1) + df[2] = (face_nodes != XUGRID_FILL).sum(axis=1) for i, column in enumerate(face_nodes.T): # Use extension array to write empty values # Should be more efficient than mixed column? df[3 + i] = pd.arrays.IntegerArray( values=column + 1, - mask=(column == grid.fill_value), + mask=(column == XUGRID_FILL), ) return df From 2fcc17e7afefd371caba4f5b8a0dbb4557bac10f Mon Sep 17 00:00:00 2001 From: Huite Date: Wed, 26 Mar 2025 15:08:47 +0100 Subject: [PATCH 2/4] Write most of the SFR logic --- imod/mf6/sfr.py | 657 ++++++++++++++++++++++++++++++++++ imod/templates/mf6/gwf-sfr.j2 | 61 ++-- 2 files changed, 683 insertions(+), 35 deletions(-) create mode 100644 imod/mf6/sfr.py diff --git a/imod/mf6/sfr.py b/imod/mf6/sfr.py new file mode 100644 index 000000000..ce82cf4af --- /dev/null +++ b/imod/mf6/sfr.py @@ -0,0 +1,657 @@ +from typing import Literal, Union + +import numpy as np +import pandas as pd +import xarray as xr + +from imod.logging import init_log_decorator +from imod.mf6.boundary_condition import AdvancedBoundaryCondition, BoundaryCondition +from imod.mf6.write_context import WriteContext +from imod.schemata import ( + AllValueSchema, + DimsSchema, + DTypeSchema, +) +from imod.select.points import points_indices +from imod.typing.grid import GridDataArray + +STATIC_EDGE_SCHEMA = DimsSchema("layer", "{edge_dim}") +EDGE_SCHEMA = ( + DimsSchema("layer", "{edge_dim}") + | DimsSchema("time", "{edge_dim}") + | DimsSchema("time", "layer", "{edge_dim}") +) + + +def _detect_update(ds: xr.Dataset, edge_dim: str) -> np.ndarray: + # Find at which times a variable changes. + # Create suitable fill value so we don't convert arrays to a different dtype + # None of the variables allow negative values, so -1 will result in a new value. + fill_value = {var: np.astype(np.float64(-1), ds[var].dtype) for var in ds.data_vars} + shifted = ds.shift({"time": 1}, fill_value=fill_value) + mutation_ds = ds != shifted + # Keep the values when no time dimension is present. + for var in mutation_ds.data_vars: + if "time" not in mutation_ds[var].dims: + mutation_ds[var] = True + melted = mutation_ds.to_dataframe().melt(id_vars=[edge_dim, "time"]) + rows_to_keep = melted["value"].to_numpy() + return rows_to_keep + + +class StreamFlowRouting(AdvancedBoundaryCondition): + """ + Stream Flow Routing (SFR) package. + + Parameters + ---------- + reach_length: array of floats (xr.DataArray) + defines the reach length. Must be greater than zero. + reach_width: array of floats (xr.DataArray) + defines the reach width. Must be greater than zero. + reach_gradient: array of floats (xr.DataArray) + defines the stream gradient (slope) across the reach. Must be greater than zero. + reach_top: array of floats (xr.DataArray) + defines the bottom elevation of the reach. + streambed_thickness: array of floats (xr.DataArray) + defines the thickness of the reach streambed. Must be greater than zero if reach + is connected to an underlying GWF cell. + bedk: array of floats (xr.DataArray) + defines the hydraulic conductivity of the reach streambed. Must be greater than zero + if reach is connected to an underlying GWF cell. + manning_n: array of floats (xr.DataArray) + defines the Manning's roughness coefficient for the reach. Must be greater than zero. + upstream_fraction: array of floats (xr.DataArray) + defines the fraction of upstream flow from each upstream reach that is applied as + upstream inflow to the reach. Must be between 0 and 1, and sum of all fractions for + reaches connected to same upstream reach must equal 1. + status: array of strings (xr.DataArray), ({"active", "inactive", "simple"}), optional. + defines the status of each reach. Can be "active", "inactive", or "simple". + The SIMPLE status option simulates streamflow using a user-specified stage for + a reach or a stage set to the top of the reach (depth = 0). In cases where the + simulated leakage calculated using the specified stage exceeds the sum of + inflows to the reach, the stage is set to the top of the reach and leakage is + set equal to the sum of inflows. Upstream fractions should be updated if the + status for one or more reaches is changed. For example, if one of two + downstream connections for a reach is inactivated, the upstream fraction for + the active and inactive downstream reach should be changed to 1.0 and 0.0, + respectively, to ensure that the active reach receives all of the downstream + outflow from the upstream reach. Default is ACTIVE for all reaches. + inflow: array of floats (xr.DataArray, optional) + defines the volumetric inflow rate for the streamflow routing reach. + Default is zero for each reach. + rainfall: array of floats (xr.DataArray, optional) + defines the volumetric rate per unit area of water added by precipitation directly + on the streamflow routing reach. Default is zero for each reach. + evaporation: array of floats (xr.DataArray, optional) + defines the volumetric rate per unit area of water subtracted by evaporation from + the streamflow routing reach. Default is zero for each reach. + runoff: array of floats (xr.DataArray, optional) + defines the volumetric rate of diffuse overland runoff that enters the streamflow + routing reach. Default is zero for each reach. + stage: array of floats (xr.DataArray, optional) + defines the stage for the reach. Only applied if reach uses simple routing option. + diversion_source: array of integers (xr.DataArray, optional) + defines the source reach of the diversion. + diversion_target: array of integers (xr.DataArray, optional) + defines the target reach of the diversion. + diversion_priority: array of strings (xr.DataArray, optional) + defines the type of diversion. One of "fraction", "excess", "threshold", "upto". + diversion_flow: array of floats (xr.DataArray, optional) + defines the diversion flow; the exact meaning depends on diversion_priority. + + * 'fraction': The amount of the diversion is computed as a fraction of + the streamflow leaving the source reach. In this case, 0.0 <= DIVFLOW + <= 1.0. + + * 'excess': A diversion is made only if the source reach flow exceeds the + value of DIVFLOW. If this occurs, then the quantity of water + diverted is the excess flow (reach flow − DIVFLOW) and the source + reach flow is set equal to DIVFLOW. This represents a flood-control + type of diversion, as described by Danskin and Hanson (2002). + + * 'threshold': If source reach flow is less than the specified + diversion flow DIVFLOW, no water is diverted from reach IFNO. If flow in + the source reach is greater than or equal to DIVFLOW, DIVFLOW is + diverted and the source reach flow is set to the remainder (flow - + DIVFLOW). This approach assumes that once flow in the stream is + sufficiently low, diversions from the stream cease, and is the + 'priority' algorithm that originally was programmed into the STR1 + Package (Prudic, 1989). + + * 'upto': If source reach flow is greater than or equal to the + specified diversion flow DIVFLOW, flow is reduced by DIVFLOW. If + source reach flow is less than DIVFLOW, DIVFLOW is set to source + reach flow and there will be no flow available for reaches connected + to downstream end of the sourche reach. + + print_input: ({True, False}, optional) + keyword to indicate that the list of SFR information will be written to the listing file + immediately after it is read. Default is False. + print_flows: ({True, False}, optional) + keyword to indicate that the list of SFR flow rates will be printed to the listing file for + every stress period time step. Default is False. + save_flows: ({True, False}, optional) + keyword to indicate that SFR flow terms will be written to the file specified with "BUDGET + FILEOUT" in Output Control. Default is False. + budget_fileout: ({"str"}, optional) + path to output cbc-file for SFR budgets + budgetcsv_fileout: ({"str"}, optional) + path to output csv-file for summed budgets + stage_fileout: ({"str"}, optional) + path to output file for stream stages + validate: {True, False} + Flag to indicate whether the package should be validated upon + initialization. This raises a ValidationError if package input is + provided in the wrong manner. Defaults to True. + """ + + _init_schemata = { + "reach_length": [ + DTypeSchema(np.floating), + STATIC_EDGE_SCHEMA, + ], + "reach_width": [ + DTypeSchema(np.floating), + STATIC_EDGE_SCHEMA, + ], + "reach_gradient": [ + DTypeSchema(np.floating), + STATIC_EDGE_SCHEMA, + ], + "reach_top": [ + DTypeSchema(np.floating), + STATIC_EDGE_SCHEMA, + ], + "streambed_thickness": [ + DTypeSchema(np.floating), + STATIC_EDGE_SCHEMA, + ], + "bedk": [ + DTypeSchema(np.floating), + EDGE_SCHEMA, + ], + "manning_n": [ + DTypeSchema(np.floating), + EDGE_SCHEMA, + ], + "upstream_fraction": [ + DTypeSchema(np.floating), + EDGE_SCHEMA, + ], + "inflow": [ + DTypeSchema(np.floating), + EDGE_SCHEMA | DimsSchema(), # optional var + ], + "rainfall": [ + DTypeSchema(np.floating), + EDGE_SCHEMA | DimsSchema(), # optional var + ], + "evaporation": [ + DTypeSchema(np.floating), + EDGE_SCHEMA | DimsSchema(), # optional var + ], + "runoff": [ + DTypeSchema(np.floating), + EDGE_SCHEMA | DimsSchema(), # optional var + ], + "stage": [ + DTypeSchema(np.floating), + EDGE_SCHEMA | DimsSchema(), # optional var + ], + "diversion_source": [ + DTypeSchema(np.integer), + DimsSchema("diversion") | DimsSchema(), + ], # optional var + "diversion_target": [ + DTypeSchema(np.integer), + DimsSchema("diversion") | DimsSchema(), + ], # optional var + "diversion_priority": [ + DTypeSchema(str), + DimsSchema("diversion") | DimsSchema(), + ], # optional var + "diversion_flow": [ + DTypeSchema(np.floating), + DimsSchema("time", "diversion") | DimsSchema("diversion") | DimsSchema(), + ], # optional var + "print_flows": [DTypeSchema(np.bool_), DimsSchema()], + "save_flows": [DTypeSchema(np.bool_), DimsSchema()], + } + + _write_schemata = { + "reach_length": [AllValueSchema(">", 0.0)], + "reach_width": [AllValueSchema(">", 0.0)], + "reach_gradient": [AllValueSchema(">", 0.0)], + "streambed_thickness": [AllValueSchema(">", 0.0)], + "bedk": [AllValueSchema(">", 0.0)], + "manning_n": [AllValueSchema(">", 0.0)], + "upstream_fraction": [ + AllValueSchema(">=", 0.0), + AllValueSchema("<=", 1.0), + ], + "diversion_source": [AllValueSchema(">=", 0)], # positive edge_index + "diversion_target": [AllValueSchema(">=", 0)], # positive edge_index + "diversion_flow": [AllValueSchema(">=", 0.0)], + # TODO: diversion_priority should be fraction, excess, threshold, upto + # TODO: status should be active, inactive, simple + "rainfall": [AllValueSchema(">=", 0.0)], + "evaporation": [AllValueSchema(">=", 0.0)], + "runoff": [AllValueSchema(">=", 0.0)], + } + + _package_data = ( + "reach_length", + "reach_width", + "reach_gradient", + "reach_top", + "streambed_thickness", + "bedk", + "manning_n", + "upstream_fraction", + ) + + _period_data = ( + "status", + "bedk", + "manning_n", + "stage", + "inflow", + "rainfall", + "evaporation", + "runoff", + "upstream_fraction", + ) + + _pkg_id = "sfr" + + _template = BoundaryCondition._initialize_template(_pkg_id) + + @init_log_decorator() + def __init__( + self, + reach_length, + reach_width, + reach_gradient, + reach_top, + streambed_thickness, + bedk, + manning_n, + upstream_fraction=None, # assumes equal fraction when not defined + status: Union[xr.DataArray, Literal["active", "inactive", "simple"]] = "active", + inflow=None, + rainfall=None, + evaporation=None, + runoff=None, + stage=None, + diversion_source=None, + diversion_target=None, + diversion_priority=None, + diversion_flow=None, + storage: bool = False, + maximum_iterations: int = 100, + maximum_depth_change: float = 1e-5, + length_conversion: float = 1.0, # assumes unit is meter! + time_conversion: float = 86_400.0, # assumes unit is day! + print_input: bool = False, + print_flows: bool = False, + save_flows: bool = False, + budget_fileout=None, + budgetcsv_fileout=None, + stage_fileout=None, + observations=None, + water_mover=None, + timeseries=None, + validate: bool = True, + ): + dict_dataset = { + # Package data + "reach_length": reach_length, + "reach_width": reach_width, + "reach_gradient": reach_gradient, + "reach_top": reach_top, + "streambed_thickness": streambed_thickness, + "bedk": bedk, + "manning_n": manning_n, + "upstream_fraction": upstream_fraction, + # Stress period data + "status": status, + "inflow": inflow, + "rainfall": rainfall, + "evaporation": evaporation, + "runoff": runoff, + "stage": stage, + # Diversions + "diversion_source": diversion_source, + "diversion_target": diversion_target, + "diversion_priority": diversion_priority, + "diversion_flow": diversion_flow, + # Options + "storage": storage, + "maximum_iterations": maximum_iterations, + "maximum_depth_change": maximum_depth_change, + "length_conversion": length_conversion, + "time_conversion": time_conversion, + "print_input": print_input, + "print_flows": print_flows, + "save_flows": save_flows, + "budget_fileout": budget_fileout, + "budgetcsv_fileout": budgetcsv_fileout, + "stage_fileout": stage_fileout, + "observations": observations, + "water_mover": water_mover, + "timeseries": timeseries, + } + super().__init__(dict_dataset) + self._validate_init_schemata(validate) + + # TODO: Borrowed and edited from lake package + # Can probably be generalized further: DISV also uses something similar! + def _write_table_section( + self, f, dataframe: pd.DataFrame, title: str, index: bool = False + ) -> None: + """ + writes a dataframe to file. Used for the connection data and for the outlet data. + """ + # TODO: might not need full precision on floating point numbers! + # 4 digits is likely sufficient + f.write(f"begin {title}\n") + dataframe.to_csv( + f, + index=index, + header=False, + sep=" ", + lineterminator="\n", + ) + f.write(f"end {title}\n") + return + + def _find_cellid(self, dst_grid: GridDataArray) -> pd.DataFrame: + # Find the centroid of each reach + x, y = self.dataset.ugrid.grid.edge_coordinates.T + # Find indices belonging to x, y coordinates + indices_cell2d = points_indices(dst_grid, out_of_bounds="ignore", x=x, y=y) + # Convert cell2d indices from 0-based to 1-based. + indices_cell2d = {dim: index + 1 for dim, index in indices_cell2d.items()} + return pd.DataFrame(indices_cell2d) + + def _derive_upstream_fraction(self, upstream_fraction_data) -> np.ndarray: + network = self.dataset.ugrid.grid + downstream_reaches = network.directed_edge_edge_connectivity + upstream_reaches = downstream_reaches.transpose().tocsr() + + # Check topology: "X"-crossing (combination of confluence and bifurcation isn't allowed) + n_upstream = upstream_reaches.getnnz(axis=1) + n_downstream = downstream_reaches.getnnz(axis=1) + complex_junction = (n_upstream > 1) & (n_downstream > 1) + if complex_junction.any(): + n_complex = complex_junction.sum() + # TODO: ideally we add a function to return these bad reaches? + # Printing them here might give (too) large error messages. + raise ValueError( + "A reach should have either multiple upstream reaches (confluence)" + "or multiple downstream reaches (furcation), but not both. " + "For complex junctions, the upstream_fraction is ambiguous. " + f"This SFR network contains {n_complex} junctions." + ) + + if upstream_fraction_data is None: + # By default, assume equal split. + n_downstream = downstream_reaches.getnnz(axis=1) + fraction_downstream = 1.0 / n_downstream + # Get upstream_fraction for each reach. + return fraction_downstream[upstream_reaches.indices] + else: + # Validate that the provided data sum to one. + # TODO: move into validation? + # TODO: validate each time step if provided in transient form. + if "time" in upstream_fraction_data.dims: + upstream_fraction_data = upstream_fraction_data.isel(time=0) + + to_validate = downstream_reaches.copy() + to_validate.data = upstream_fraction_data[downstream_reaches.indices] + fraction_sum = to_validate.sum(axis=1) + # TODO: ideally we add a function to return these bad reaches? + not_one = ~np.isclose(fraction_sum, 1.0) + if not_one.any(): + # TODO: ideally we add a function to return these bad reaches? + # Printing them here might give (too) large error messages. + raise ValueError( + "upstream_fraction does not sum to 1.0 for all reaches." + ) + return upstream_fraction_data + + def _packagedata_dataframe( + self, dst_grid, upstream_reaches, downstream_reaches + ) -> pd.DataFrame: + dataset = self.dataset + network = dataset.ugrid.grid + # TODO: assign to which layer? + # Assigning to multiple layers is problematic + # Maybe use river allocate logic? + cellid = self._find_cellid(dst_grid) + # TODO: layer currently missing + + package_data_vars = list(self._package_data) + # Derive upstream_fraction if not provided (simple networks) + package_data_vars.pop("upstream_fraction") + + ds = self.dataset[package_data_vars] + if "time" in ds.dims: + ds = ds.isel(time=0) + + df = ds.to_dataframe() + df["upstream_fraction"] = self._derive_upstream_fraction( + dataset["upstream_fraction"], upstream_reaches, downstream_reaches + ) + if dataset["diversion_source"] is None: + df["number_of_diversions"] = 0 + else: + df["number_of_diversions"] = np.bincount( + dataset["diversion_source"], minlength=network.n_edge + ) + return pd.concat((cellid, df), axis=1) + + def _connectiondata_dataframe(self) -> pd.DataFrame: + # MODFLOW 6 wants the upstream reaches numbered POSITIVE + # and the downstream reaches numbered NEGATIVE + network = self.dataset.ugrid.grid + # Derive the connectivity for each reach (edge) + downstream = network.directed_edge_edge_connectivity + upstream = downstream.transpose().tocsr() + # Increment by 2 because a 0 will be seen as a structural zero; + # and the default fill value of the format method is -1. + downstream.data = downstream.indices + 2 + upstream.data = upstream.indices + 2 + sfr_connectivity = network.format_connectivity_as_dense( + upstream - downstream + ) + # Sort decreasing, this'll move the -1 to a middle position, + # but the pandas writer will omit the masked value. + sfr_connectivity.sort(axis=1) + + # Use pandas Integer array for NA (masked) values. + df = pd.DataFrame() + for i, column in enumerate(sfr_connectivity.T): + df[i] = pd.arrays.IntegerArray( + # MODFLOW starts numbering at 1. + values=column - 1, + mask=(column == -1), + ) + # Increment the index by one, as MODFLOW numbers ifno from 1. + df.index += 1 + return df + + def _crosssections_dataframe(self) -> Union[pd.DataFrame, None]: + # TODO: + return None + + def _diversions_dataframe(self) -> Union[pd.DataFrame, None]: + diversion_source = self.dataset["diversion_source"] + if diversion_source is None: + return None + diversion_df = self.dataset[ + ["diversion_source", "diversion_target", "diversion_priority"] + ].to_dataframe() + diversion_df["diversion_index"] = np.arange(len(diversion_source)) + 1 + # Make sure columns are returned in the right order + return diversion_df[ + [ + "diversion_source", + "diversion_index", + "diversion_target", + "diversion_priority", + ] + ] + + def _initialstages_dataframe(self) -> Union[pd.DataFrame, None]: + stage = self.dataset["stage"] + if stage is not None: + if "time" in stage.dims: + initialstages = stage.isel(time=0) + else: + initialstages = stage + df = initialstages.to_dataframe() + df.index += 1 + return df + + def _perioddata_dataframe(self, globaltimes) -> Union[pd.DataFrame, None]: + period_ds = self.dataset[self._period_data] + edge_dim = self.dataset.ugrid.grid.edge_dimension + + # These variables are both package data and period data. If they + # aren't transient, the package data entry suffices, and they don't + # need to be written in the period block. + # Also drop optional transient entries. + vars_to_drop = [ + var + for var in ("bedk", "manning", "stage", "upstream_fraction") + if "time" not in self.dataset[var].dims + ] + [var for var in self._period_data if self.dataset[var] is None] + period_ds = period_ds.drop_vars(vars_to_drop) + if len(period_ds.data_vars) == 0: + return None + + # Some variables can only be specified in the period block. + # If all are constant over time, create a one-sized time dimension. + if "time" not in period_ds: + period_ds = period_ds.expand_dims("time") + period_ds["time"] = [1] + else: + period_ds["time"] = np.searchsorted(period_ds["time"], globaltimes) + 1 + + rows_to_keep = _detect_update(period_ds, edge_dim) + period_df = period_ds.to_dataframe() + updates = period_df.melt(id_vars=[edge_dim, "time"]).iloc[rows_to_keep] + + diversion_source = self.dataset["diversion_source"] + if diversion_source is not None: + # Diversion source contains the edge index + # Detect changes of the flow + diversion_ds = self.dataset[["diversion_flow"]] + diversion_ds["time"] = ( + np.searchsorted(diversion_ds["time"], globaltimes) + 1 + ) + diversion_to_keep = _detect_update(diversion_ds, edge_dim) + # Add the edge_index (IFNO), this'll end up as an index + diversion_ds[edge_dim] = diversion_source + diversion_ds["diversion_index"] = np.arange(len(diversion_source)) + 1 + diversion_df = diversion_ds.to_dataframe() + # Concatenate the entries into a single column so it'll match the other period data. + diversion_df["diversion_entry"] = ( + diversion_df["diversion_index"].astype(str) + + " " + + diversion_df["diversion_flow"].astype(str) + ) + diversion_updates = ( + diversion_df[[edge_dim, "time", "diversion_entry"]] + .melt(id_vars=[edge_dim, "time"]) + .iloc[diversion_to_keep] + ) + updates = pd.concat((updates, diversion_updates)) + + return updates + + def render(self, directory, pkgname, globaltimes, binary) -> str: + d = { + "storage": storage, + "maximum_iterations": maximum_iterations, + "maximum_depth_change": maximum_depth_change, + "length_conversion": length_conversion, + "time_conversion": time_conversion, + "print_input": print_input, + "print_flows": print_flows, + "save_flows": save_flows, + "budget_fileout": budget_fileout, + "budgetcsv_fileout": budgetcsv_fileout, + "stage_fileout": stage_fileout, + "observations": observations, + "water_mover": water_mover, + "timeseries": timeseries, + "nreaches": self.dataset.ugrid.grid.n_edge, + } + return self._template.render(d) + + def write_blockfile( + self, pkgname, globaltimes, write_context: WriteContext, idomain + ): + dir_for_render = write_context.get_formatted_write_directory() + content = self.render( + dir_for_render, pkgname, globaltimes, write_context.use_binary + ) + filename = write_context.write_directory / f"{pkgname}.{self._pkg_id}" + + packagedata_df = self._packagedata_dataframe() + + with open(filename, "w") as f: + f.write(content) + f.write("\n\n") + + connectiondata_df = self._connectiondata_dataframe() + crosssection_df = self._crosssections_dataframe() + packagedata_df = self._packagedata_dataframe(idomain) + diversions_df = self._diversions_dataframe() + initialstages_df = self._initialstages_dataframe() + perioddata_df = self._perioddata_dataframe(globaltimes) + + self._write_table_section(f, packagedata_df, title="packagedata") + if crosssection_df is not None: + self._write_table_section(f, crosssection_df, title="crossections") + self._write_table_section(f, connectiondata_df, title="connectiondata") + if diversions_df is not None: + self._write_table_section(f, diversions_df, title="diversions") + if initialstages_df is not None: + self._write_table_section( + f, initialstages_df, title="initialstages" + ) + if perioddata_df is not None: + # Iterate over the periods + for period_number, period_df in perioddata_df.groupby("time"): + self._write_table_section( + f, period_df, title=f"period {period_number}" + ) + + return + + # Overloaded, neutralized methods + # TODO: would be more convenient to move these into a single function to overload? + + def _package_data_to_sparse(self): + return + + def fill_stress_perioddata(self): + # this function is called from packagebase and should do nothing in this context + return + + def _write_perioddata(self, directory, pkgname, binary): + # this function is called from packagebase and should do nothing in this context + return + + def is_splitting_supported(self) -> bool: + return False + + def is_regridding_supported(self) -> bool: + return False + + def is_clipping_supported(self) -> bool: + return False diff --git a/imod/templates/mf6/gwf-sfr.j2 b/imod/templates/mf6/gwf-sfr.j2 index dec911c69..cc18f7005 100644 --- a/imod/templates/mf6/gwf-sfr.j2 +++ b/imod/templates/mf6/gwf-sfr.j2 @@ -1,42 +1,33 @@ begin options -{%- if auxiliary is defined -%} auxiliary {{auxiliary(naux)}}{%- endif -%} -{%- if boundnames is defined -%} boundnames{%- endif -%} -{%- if print_input is defined -%} print_input{%- endif -%} -{%- if print_stage is defined -%} print_stage{%- endif -%} -{%- if print_flows is defined -%} print_flows{%- endif -%} -{%- if save_flows is defined -%} save_flows{%- endif -%} -{%- if stage_filerecord is defined -%} stage fileout {{stagefile}}{%- endif -%} -{%- if budget_filerecord is defined -%} budget fileout {{budgetfile}}{%- endif -%} -{%- if ts_filerecord is defined -%} ts6 filein {{ts6_filename}}{%- endif -%} -{%- if obs_filerecord is defined -%} obs6 filein {{obs6_filename}}{%- endif -%} -{%- if mover is defined -%} mover{%- endif -%} -{%- if maximum_iterations is defined -%} maximum_iterations {{maximum_iterations}}{%- endif -%} -{%- if maximum_depth_change is defined -%} maximum_depth_change {{maximum_depth_change}}{%- endif -%} -{%- if unit_conversion is defined -%} unit_conversion {{unit_conversion}}{%- endif -%} +{%- if storage is defined %} + storage{% endif %} +{%- if print_input is defined %} + print_input{% endif %} +{%- if print_stage is defined %} + print_stage{% endif %} +{%- if print_flows is defined %} + print_flows{% endif %} +{%- if save_flows is defined %} + save_flows{% endif %} +{%- if budget_fileout is defined %} + budget fileout {{budget_fileout}}{% endif %} +{%- if budgetcsv_fileout is defined %} + budgetcsv fileout {{budgetcsv_fileout}}{% endif %} +{%- if mover is defined %} + mover{% endif %} +{%- if maximum_picard_iterations is defined %} + maximum_picard_iterations {{maximum_picard_iterations}}{% endif %} +{%- if maximum_iterations is defined %} + maximum_iterations {{maximum_iterations}}{% endif %} +{%- if maximum_depth_change is defined %} + maximum_depth_change {{maximum_depth_change}}{% endif %} +{%- if length_conversion is defined %} + length_conversion {{length_conversion}}{% endif %} +{%- if time_conversion is defined %} + time_conversion {{time_conversion}}{% endif %} end options begin dimensions nreaches {{nreaches}} end dimensions -begin packagedata - {{rno}} {{cellid(ncelldim)}} {{rlen}} {{rwid}} {{rgrd}} {{rtp}} {{rbth}} {{rhk}} {{man}} {{ncon}} {{ustrf}} {{ndv}} {%- if aux is defined -%}{{aux(naux)}}{%- endif -%} {%- if boundname is defined -%}{{boundname}}{%- endif -%} - {{rno}} {{cellid(ncelldim)}} {{rlen}} {{rwid}} {{rgrd}} {{rtp}} {{rbth}} {{rhk}} {{man}} {{ncon}} {{ustrf}} {{ndv}} {%- if aux is defined -%}{{aux(naux)}}{%- endif -%} {%- if boundname is defined -%}{{boundname}}{%- endif -%} - ... -end packagedata - -begin connectiondata - {{rno}} {{ic(ncon(rno))}} - {{rno}} {{ic(ncon(rno))}} - ... -end connectiondata - -begin diversions - {{rno}} {{idv}} {{iconr}} {{cprior}} - {{rno}} {{idv}} {{iconr}} {{cprior}} - ... -end diversions - -{% for i, path in periods.items() %}begin period {{i}} - open/close {{path}} (binary) -end period{% endfor %} \ No newline at end of file From 84625a92fb86ad54cea73e613fdc6890f671dcce Mon Sep 17 00:00:00 2001 From: Huite Date: Thu, 27 Mar 2025 22:58:24 +0100 Subject: [PATCH 3/4] Basic example seems to run! --- examples/mf6/streamflow_routing.py | 136 +++++++ imod/mf6/__init__.py | 1 + imod/mf6/model.py | 22 + imod/mf6/package.py | 2 +- imod/mf6/sfr.py | 634 +++++++++++++++-------------- 5 files changed, 492 insertions(+), 303 deletions(-) create mode 100644 examples/mf6/streamflow_routing.py diff --git a/examples/mf6/streamflow_routing.py b/examples/mf6/streamflow_routing.py new file mode 100644 index 000000000..16876e529 --- /dev/null +++ b/examples/mf6/streamflow_routing.py @@ -0,0 +1,136 @@ +# %% +from pathlib import Path + +import numpy as np +import xarray as xr +import xugrid as xu + +import imod + +# %% +nlayer = 1 +nrow = 5 +ncol = 5 + +dx = 1000.0 +dy = -1000.0 + +x = np.arange(0, dx * ncol, dx) + 0.5 * dx +y = np.arange(0, dy * nrow, dy) + 0.5 * dy +layer = np.array([1]) + +idomain = xr.DataArray( + np.ones((nlayer, nrow, ncol), dtype=int), + dims=("layer", "y", "x"), + coords={"layer": layer, "y": y, "x": x}, +) +# %% + +elevation = xr.full_like(idomain.sel(layer=1), 5.0, dtype=float) +conductance = xr.full_like(idomain.sel(layer=1), 1000.0**2, dtype=float) +rch_rate = xr.full_like(idomain.sel(layer=1), 0.001, dtype=float) + +# Node properties +icelltype = xr.DataArray([0], {"layer": layer}, ("layer",)) +k = xr.DataArray([10.0], {"layer": layer}, ("layer",)) +bottom = xr.DataArray([-10.0], {"layer": layer}, ("layer",)) + +# %% + +network = xu.Ugrid1d( + node_x=np.array([1000.0, 2000.0, 3000.0, 4000.0]), + node_y=np.array([-2500.0, -2500.0, -2500.0, -2500.0]), + fill_value=-1, + edge_node_connectivity=np.array( + [ + [0, 1], + [1, 2], + [2, 3], + ] + ), +) +edgedim = network.edge_dimension +uds = xu.UgridDataset(grids=[network]) +uds["reach_length"] = xr.DataArray(np.full(3, 1000.0), dims=[edgedim]) +uds["reach_width"] = xr.DataArray(np.full(3, 10.0), dims=[edgedim]) +uds["reach_gradient"] = xr.DataArray(np.full(3, 1.0e-3), dims=[edgedim]) +uds["reach_top"] = xr.DataArray(np.array([1.0e-3, 0.0, -1.0e-3]), dims=[edgedim]) +uds["streambed_thickness"] = xr.DataArray(np.full(3, 0.1), dims=[edgedim]) +uds["bedk"] = xr.DataArray(np.full(3, 0.1), dims=[edgedim]) +uds["manning_n"] = xr.DataArray(np.full(3, 0.03), dims=[edgedim]) +uds["layer"] = xr.DataArray(np.full(3, 1), dims=[edgedim]) +sfr = imod.mf6.StreamFlowRouting(**uds) + + +# %% + +gwf_model = imod.mf6.GroundwaterFlowModel() +gwf_model["dis"] = imod.mf6.StructuredDiscretization( + top=5.0, bottom=bottom, idomain=idomain +) +gwf_model["drn"] = imod.mf6.Drainage( + elevation=elevation, + conductance=conductance, + print_input=True, + print_flows=True, + save_flows=True, +) +gwf_model["ic"] = imod.mf6.InitialConditions(start=0.0) +gwf_model["npf"] = imod.mf6.NodePropertyFlow( + icelltype=icelltype, + k=k, + variable_vertical_conductance=True, + dewatered=True, + perched=True, + save_flows=True, +) +gwf_model["sto"] = imod.mf6.SpecificStorage( + specific_storage=1.0e-5, + specific_yield=0.15, + transient=False, + convertible=0, +) +gwf_model["oc"] = imod.mf6.OutputControl(save_head="all", save_budget="all") +gwf_model["rch"] = imod.mf6.Recharge(rch_rate) +gwf_model["sfr"] = sfr +# %% + +# Attach it to a simulation +simulation = imod.mf6.Modflow6Simulation("sfr-test") +simulation["GWF_1"] = gwf_model +# Define solver settings +simulation["solver"] = imod.mf6.Solution( + modelnames=["GWF_1"], + print_option="summary", + outer_dvclose=1.0e-4, + outer_maximum=500, + under_relaxation=None, + inner_dvclose=1.0e-4, + inner_rclose=0.001, + inner_maximum=100, + linear_acceleration="cg", + scaling_method=None, + reordering_method=None, + relaxation_factor=0.97, +) +# Collect time discretization +simulation.create_time_discretization( + additional_times=["2000-01-01", "2000-01-02", "2000-01-03", "2000-01-04"] +) + +modeldir = Path("examples/mf6/stream-routing") +simulation.write(modeldir) +# %% +simulation.run() + +# %% + +head = imod.mf6.open_hds( + modeldir / "GWF_1/GWF_1.hds", + modeldir / "GWF_1/dis.dis.grb", +) +# %% + +head.isel(time=0, layer=0).plot.contourf() + +# %% diff --git a/imod/mf6/__init__.py b/imod/mf6/__init__.py index 8cd53bbfe..877f1603d 100644 --- a/imod/mf6/__init__.py +++ b/imod/mf6/__init__.py @@ -49,6 +49,7 @@ ) from imod.mf6.rch import Recharge from imod.mf6.riv import River +from imod.mf6.sfr import StreamFlowRouting from imod.mf6.simulation import Modflow6Simulation from imod.mf6.src import MassSourceLoading from imod.mf6.ssm import SourceSinkMixing diff --git a/imod/mf6/model.py b/imod/mf6/model.py index 5b3c4b038..c915e57c0 100644 --- a/imod/mf6/model.py +++ b/imod/mf6/model.py @@ -309,6 +309,21 @@ def _prepare_wel_for_mf6( validate_context, ) + @standard_log_decorator() + def _prepare_sfr_for_mf6( + self, pkgname: str, validate_context: ValidationContext + ) -> Mf6Wel: + pkg = self[pkgname] + top, bottom, idomain = self._get_domain_geometry() + k = self._get_k() + return pkg._to_mf6_pkg( + idomain, + top, + bottom, + k, + validate_context, + ) + @standard_log_decorator() def write( self, @@ -395,6 +410,13 @@ def _write( globaltimes=globaltimes, write_context=pkg_write_context, ) + elif isinstance(pkg, imod.mf6.StreamFlowRouting): + mf6_sfr_pkg = self._prepare_sfr_for_mf6(pkg_name, validate_context) + mf6_sfr_pkg._write( + pkgname=pkg_name, + globaltimes=globaltimes, + write_context=pkg_write_context, + ) elif issubclass(type(pkg), imod.mf6.HorizontalFlowBarrierBase): mf6_hfb_ls.append(pkg) else: diff --git a/imod/mf6/package.py b/imod/mf6/package.py index db42ff208..9d19adf7c 100644 --- a/imod/mf6/package.py +++ b/imod/mf6/package.py @@ -373,7 +373,7 @@ def _validate_init_schemata(self, validate: bool): def copy(self) -> Any: # All state should be contained in the dataset. - return type(self)(**self.dataset.copy().to_dict()) + return type(self)(**self.dataset.copy()) def clip_box( self, diff --git a/imod/mf6/sfr.py b/imod/mf6/sfr.py index ce82cf4af..d41cd4dbf 100644 --- a/imod/mf6/sfr.py +++ b/imod/mf6/sfr.py @@ -5,7 +5,7 @@ import xarray as xr from imod.logging import init_log_decorator -from imod.mf6.boundary_condition import AdvancedBoundaryCondition, BoundaryCondition +from imod.mf6.boundary_condition import BoundaryCondition from imod.mf6.write_context import WriteContext from imod.schemata import ( AllValueSchema, @@ -15,12 +15,8 @@ from imod.select.points import points_indices from imod.typing.grid import GridDataArray -STATIC_EDGE_SCHEMA = DimsSchema("layer", "{edge_dim}") -EDGE_SCHEMA = ( - DimsSchema("layer", "{edge_dim}") - | DimsSchema("time", "{edge_dim}") - | DimsSchema("time", "layer", "{edge_dim}") -) +STATIC_EDGE_SCHEMA = DimsSchema("{edge_dim}") +EDGE_SCHEMA = DimsSchema("{edge_dim}") | DimsSchema("time", "{edge_dim}") def _detect_update(ds: xr.Dataset, edge_dim: str) -> np.ndarray: @@ -39,7 +35,7 @@ def _detect_update(ds: xr.Dataset, edge_dim: str) -> np.ndarray: return rows_to_keep -class StreamFlowRouting(AdvancedBoundaryCondition): +class StreamFlowRouting(BoundaryCondition): """ Stream Flow Routing (SFR) package. @@ -61,7 +57,10 @@ class StreamFlowRouting(AdvancedBoundaryCondition): if reach is connected to an underlying GWF cell. manning_n: array of floats (xr.DataArray) defines the Manning's roughness coefficient for the reach. Must be greater than zero. - upstream_fraction: array of floats (xr.DataArray) + layer: array of integers (xr.DataArray), optional + to which MODFLOW layer the reach is connected. A value of 0 can be specified for + reaches that are not connected to the groundwater. + upstream_fraction: array of floats (xr.DataArray), optional defines the fraction of upstream flow from each upstream reach that is applied as upstream inflow to the reach. Must be between 0 and 1, and sum of all fractions for reaches connected to same upstream reach must equal 1. @@ -175,9 +174,13 @@ class StreamFlowRouting(AdvancedBoundaryCondition): DTypeSchema(np.floating), EDGE_SCHEMA, ], + "layer": [ + DTypeSchema(np.integer), + EDGE_SCHEMA, + ], "upstream_fraction": [ DTypeSchema(np.floating), - EDGE_SCHEMA, + EDGE_SCHEMA | DimsSchema(), # optional var ], "inflow": [ DTypeSchema(np.floating), @@ -226,6 +229,7 @@ class StreamFlowRouting(AdvancedBoundaryCondition): "streambed_thickness": [AllValueSchema(">", 0.0)], "bedk": [AllValueSchema(">", 0.0)], "manning_n": [AllValueSchema(">", 0.0)], + "layer": [AllValueSchema(">=", 0.0)], "upstream_fraction": [ AllValueSchema(">=", 0.0), AllValueSchema("<=", 1.0), @@ -277,6 +281,7 @@ def __init__( streambed_thickness, bedk, manning_n, + layer=None, # assign based on reach_top if None? upstream_fraction=None, # assumes equal fraction when not defined status: Union[xr.DataArray, Literal["active", "inactive", "simple"]] = "active", inflow=None, @@ -292,16 +297,12 @@ def __init__( maximum_iterations: int = 100, maximum_depth_change: float = 1e-5, length_conversion: float = 1.0, # assumes unit is meter! - time_conversion: float = 86_400.0, # assumes unit is day! print_input: bool = False, print_flows: bool = False, save_flows: bool = False, budget_fileout=None, budgetcsv_fileout=None, stage_fileout=None, - observations=None, - water_mover=None, - timeseries=None, validate: bool = True, ): dict_dataset = { @@ -313,6 +314,7 @@ def __init__( "streambed_thickness": streambed_thickness, "bedk": bedk, "manning_n": manning_n, + "layer": layer, "upstream_fraction": upstream_fraction, # Stress period data "status": status, @@ -331,327 +333,355 @@ def __init__( "maximum_iterations": maximum_iterations, "maximum_depth_change": maximum_depth_change, "length_conversion": length_conversion, - "time_conversion": time_conversion, "print_input": print_input, "print_flows": print_flows, "save_flows": save_flows, "budget_fileout": budget_fileout, "budgetcsv_fileout": budgetcsv_fileout, "stage_fileout": stage_fileout, - "observations": observations, - "water_mover": water_mover, - "timeseries": timeseries, } super().__init__(dict_dataset) self._validate_init_schemata(validate) - # TODO: Borrowed and edited from lake package - # Can probably be generalized further: DISV also uses something similar! - def _write_table_section( - self, f, dataframe: pd.DataFrame, title: str, index: bool = False - ) -> None: - """ - writes a dataframe to file. Used for the connection data and for the outlet data. - """ - # TODO: might not need full precision on floating point numbers! - # 4 digits is likely sufficient - f.write(f"begin {title}\n") - dataframe.to_csv( - f, - index=index, - header=False, - sep=" ", - lineterminator="\n", + # TODO: Borrowed and edited from lake package + # Can probably be generalized further: DISV also uses something similar! + def _write_table_section( + self, f, dataframe: pd.DataFrame, title: str, index: bool = False + ) -> None: + """ + writes a dataframe to file. Used for the connection data and for the outlet data. + """ + # TODO: might not need full precision on floating point numbers! + # 4 digits is likely sufficient + f.write(f"begin {title}\n") + dataframe.to_csv( + f, + index=index, + header=False, + sep=" ", + lineterminator="\n", + ) + f.write(f"end {title}\n") + return + + def _find_cellid(self, dst_grid: GridDataArray) -> pd.DataFrame: + # Find the centroid of each reach + network = self.dataset.ugrid.grid + x, y = network.edge_coordinates.T + # Find indices belonging to x, y coordinates + indices_cell2d = points_indices(dst_grid, out_of_bounds="ignore", x=x, y=y) + # Fill in values of 0 for all indices when unconnected. + # Convert cell2d indices from 0-based to 1-based. + layer = self.dataset["layer"].to_numpy() + unconnected = layer == 0 + indices = np.stack( + [layer] + + [ + np.where(unconnected, 0, index + 1) + for index in reversed(indices_cell2d.values()) + ], + axis=1, + ) + labels = ["layer"] + list(indices_cell2d.keys()) + return xr.DataArray( + indices, + dims=(network.edge_dimension, "dimension"), + coords={"dimension": labels}, + ) + + def _to_mf6_pkg( + self, + idomain, + top, + bottom, + k, + validate_context, + ) -> "StreamFlowRouting": + new = self.copy() + new.dataset["cellid"] = self._find_cellid(idomain) + return new + + def _derive_upstream_fraction(self, upstream_fraction_data) -> np.ndarray: + network = self.dataset.ugrid.grid + downstream_reaches = network.directed_edge_edge_connectivity + upstream_reaches = downstream_reaches.transpose().tocsr() + + # Check topology: "X"-crossing (combination of confluence and bifurcation isn't allowed) + n_upstream = upstream_reaches.getnnz(axis=1) + n_downstream = downstream_reaches.getnnz(axis=1) + complex_junction = (n_upstream > 1) & (n_downstream > 1) + if complex_junction.any(): + n_complex = complex_junction.sum() + # TODO: ideally we add a function to return these bad reaches? + # Printing them here might give (too) large error messages. + raise ValueError( + "A reach should have either multiple upstream reaches (confluence)" + "or multiple downstream reaches (furcation), but not both. " + "For complex junctions, the upstream_fraction is ambiguous. " + f"This SFR network contains {n_complex} junctions." ) - f.write(f"end {title}\n") - return - - def _find_cellid(self, dst_grid: GridDataArray) -> pd.DataFrame: - # Find the centroid of each reach - x, y = self.dataset.ugrid.grid.edge_coordinates.T - # Find indices belonging to x, y coordinates - indices_cell2d = points_indices(dst_grid, out_of_bounds="ignore", x=x, y=y) - # Convert cell2d indices from 0-based to 1-based. - indices_cell2d = {dim: index + 1 for dim, index in indices_cell2d.items()} - return pd.DataFrame(indices_cell2d) - - def _derive_upstream_fraction(self, upstream_fraction_data) -> np.ndarray: - network = self.dataset.ugrid.grid - downstream_reaches = network.directed_edge_edge_connectivity - upstream_reaches = downstream_reaches.transpose().tocsr() - - # Check topology: "X"-crossing (combination of confluence and bifurcation isn't allowed) - n_upstream = upstream_reaches.getnnz(axis=1) + + # TODO: efficiently check for None + if upstream_fraction_data.isnull(): + # By default, assume equal split. n_downstream = downstream_reaches.getnnz(axis=1) - complex_junction = (n_upstream > 1) & (n_downstream > 1) - if complex_junction.any(): - n_complex = complex_junction.sum() + fraction_downstream = 1.0 / n_downstream + # Get upstream_fraction for each reach. + # TODO: think this over once more + fraction_upstream = np.ones(network.n_edge) + fraction_upstream[downstream_reaches.indices] = fraction_downstream[ + upstream_reaches.indices + ] + return fraction_upstream + else: + # Validate that the provided data sum to one. + # TODO: move into validation? + # TODO: validate each time step if provided in transient form. + if "time" in upstream_fraction_data.dims: + upstream_fraction_data = upstream_fraction_data.isel(time=0) + + to_validate = downstream_reaches.copy() + to_validate.data = upstream_fraction_data[downstream_reaches.indices] + fraction_sum = to_validate.sum(axis=1) + # TODO: ideally we add a function to return these bad reaches? + not_one = ~np.isclose(fraction_sum, 1.0) + if not_one.any(): # TODO: ideally we add a function to return these bad reaches? # Printing them here might give (too) large error messages. raise ValueError( - "A reach should have either multiple upstream reaches (confluence)" - "or multiple downstream reaches (furcation), but not both. " - "For complex junctions, the upstream_fraction is ambiguous. " - f"This SFR network contains {n_complex} junctions." + "upstream_fraction does not sum to 1.0 for all reaches." ) - - if upstream_fraction_data is None: - # By default, assume equal split. - n_downstream = downstream_reaches.getnnz(axis=1) - fraction_downstream = 1.0 / n_downstream - # Get upstream_fraction for each reach. - return fraction_downstream[upstream_reaches.indices] - else: - # Validate that the provided data sum to one. - # TODO: move into validation? - # TODO: validate each time step if provided in transient form. - if "time" in upstream_fraction_data.dims: - upstream_fraction_data = upstream_fraction_data.isel(time=0) - - to_validate = downstream_reaches.copy() - to_validate.data = upstream_fraction_data[downstream_reaches.indices] - fraction_sum = to_validate.sum(axis=1) - # TODO: ideally we add a function to return these bad reaches? - not_one = ~np.isclose(fraction_sum, 1.0) - if not_one.any(): - # TODO: ideally we add a function to return these bad reaches? - # Printing them here might give (too) large error messages. - raise ValueError( - "upstream_fraction does not sum to 1.0 for all reaches." - ) - return upstream_fraction_data - - def _packagedata_dataframe( - self, dst_grid, upstream_reaches, downstream_reaches - ) -> pd.DataFrame: - dataset = self.dataset - network = dataset.ugrid.grid - # TODO: assign to which layer? - # Assigning to multiple layers is problematic - # Maybe use river allocate logic? - cellid = self._find_cellid(dst_grid) - # TODO: layer currently missing - - package_data_vars = list(self._package_data) - # Derive upstream_fraction if not provided (simple networks) - package_data_vars.pop("upstream_fraction") - - ds = self.dataset[package_data_vars] - if "time" in ds.dims: - ds = ds.isel(time=0) - - df = ds.to_dataframe() - df["upstream_fraction"] = self._derive_upstream_fraction( - dataset["upstream_fraction"], upstream_reaches, downstream_reaches + return upstream_fraction_data + + def _packagedata_dataframe(self) -> pd.DataFrame: + dataset = self.dataset + network = dataset.ugrid.grid + cellid = dataset["cellid"].to_pandas() + + package_data_vars = list(self._package_data) + # Derive upstream_fraction if not provided (simple networks) + package_data_vars.remove("upstream_fraction") + + ds = self.dataset[package_data_vars] + if "time" in ds.dims: + ds = ds.isel(time=0) + + df = ds.to_dataframe() + df["n_connection"] = network.edge_edge_connectivity.getnnz(axis=1) + df["upstream_fraction"] = self._derive_upstream_fraction( + dataset["upstream_fraction"] + ) + # TODO: check for None efficiently + if dataset["diversion_source"].isnull(): + df["number_of_diversions"] = 0 + else: + df["number_of_diversions"] = np.bincount( + dataset["diversion_source"], minlength=network.n_edge ) - if dataset["diversion_source"] is None: - df["number_of_diversions"] = 0 - else: - df["number_of_diversions"] = np.bincount( - dataset["diversion_source"], minlength=network.n_edge - ) - return pd.concat((cellid, df), axis=1) - - def _connectiondata_dataframe(self) -> pd.DataFrame: - # MODFLOW 6 wants the upstream reaches numbered POSITIVE - # and the downstream reaches numbered NEGATIVE - network = self.dataset.ugrid.grid - # Derive the connectivity for each reach (edge) - downstream = network.directed_edge_edge_connectivity - upstream = downstream.transpose().tocsr() - # Increment by 2 because a 0 will be seen as a structural zero; - # and the default fill value of the format method is -1. - downstream.data = downstream.indices + 2 - upstream.data = upstream.indices + 2 - sfr_connectivity = network.format_connectivity_as_dense( - upstream - downstream + dataframe = pd.concat((cellid, df), axis=1) + # MODFLOW 6 is 1-based + dataframe.index += 1 + return dataframe + + def _connectiondata_dataframe(self) -> pd.DataFrame: + INT_MAX = np.iinfo(np.int64).max + # MODFLOW 6 wants the upstream reaches numbered POSITIVE + # and the downstream reaches numbered NEGATIVE + network = self.dataset.ugrid.grid + # Derive the connectivity for each reach (edge) + downstream = network.directed_edge_edge_connectivity + upstream = downstream.transpose().tocsr() + sfr_upstream = network.format_connectivity_as_dense(upstream) + sfr_downstream = network.format_connectivity_as_dense(downstream) + sfr_connectivity = np.hstack((sfr_upstream + 1, -(sfr_downstream + 1))) + sfr_fill = np.hstack((sfr_upstream == -1, sfr_downstream == -1)) + sfr_connectivity[sfr_fill] = INT_MAX + # Use pandas Integer array for NA (masked) values. + df = pd.DataFrame() + for i, column in enumerate(sfr_connectivity.T): + df[i] = pd.arrays.IntegerArray( + # MODFLOW starts numbering at 1. + values=column, + mask=(column == INT_MAX), ) - # Sort decreasing, this'll move the -1 to a middle position, - # but the pandas writer will omit the masked value. - sfr_connectivity.sort(axis=1) - - # Use pandas Integer array for NA (masked) values. - df = pd.DataFrame() - for i, column in enumerate(sfr_connectivity.T): - df[i] = pd.arrays.IntegerArray( - # MODFLOW starts numbering at 1. - values=column - 1, - mask=(column == -1), - ) - # Increment the index by one, as MODFLOW numbers ifno from 1. - df.index += 1 - return df + # Increment the index by one, as MODFLOW numbers ifno from 1. + df.index += 1 + return df - def _crosssections_dataframe(self) -> Union[pd.DataFrame, None]: - # TODO: - return None + def _crosssections_dataframe(self) -> Union[pd.DataFrame, None]: + # TODO: + return None - def _diversions_dataframe(self) -> Union[pd.DataFrame, None]: - diversion_source = self.dataset["diversion_source"] - if diversion_source is None: - return None - diversion_df = self.dataset[ - ["diversion_source", "diversion_target", "diversion_priority"] - ].to_dataframe() - diversion_df["diversion_index"] = np.arange(len(diversion_source)) + 1 - # Make sure columns are returned in the right order - return diversion_df[ - [ - "diversion_source", - "diversion_index", - "diversion_target", - "diversion_priority", - ] + def _diversions_dataframe(self) -> Union[pd.DataFrame, None]: + diversion_source = self.dataset["diversion_source"] + if diversion_source.isnull(): + return None + diversion_df = self.dataset[ + ["diversion_source", "diversion_target", "diversion_priority"] + ].to_dataframe() + diversion_df["diversion_index"] = np.arange(len(diversion_source)) + 1 + # Make sure columns are returned in the right order + return diversion_df[ + [ + "diversion_source", + "diversion_index", + "diversion_target", + "diversion_priority", ] + ] - def _initialstages_dataframe(self) -> Union[pd.DataFrame, None]: - stage = self.dataset["stage"] - if stage is not None: - if "time" in stage.dims: - initialstages = stage.isel(time=0) - else: - initialstages = stage - df = initialstages.to_dataframe() - df.index += 1 - return df - - def _perioddata_dataframe(self, globaltimes) -> Union[pd.DataFrame, None]: - period_ds = self.dataset[self._period_data] - edge_dim = self.dataset.ugrid.grid.edge_dimension - - # These variables are both package data and period data. If they - # aren't transient, the package data entry suffices, and they don't - # need to be written in the period block. - # Also drop optional transient entries. - vars_to_drop = [ - var - for var in ("bedk", "manning", "stage", "upstream_fraction") - if "time" not in self.dataset[var].dims - ] + [var for var in self._period_data if self.dataset[var] is None] - period_ds = period_ds.drop_vars(vars_to_drop) - if len(period_ds.data_vars) == 0: - return None - - # Some variables can only be specified in the period block. - # If all are constant over time, create a one-sized time dimension. - if "time" not in period_ds: - period_ds = period_ds.expand_dims("time") - period_ds["time"] = [1] - else: - period_ds["time"] = np.searchsorted(period_ds["time"], globaltimes) + 1 - - rows_to_keep = _detect_update(period_ds, edge_dim) - period_df = period_ds.to_dataframe() - updates = period_df.melt(id_vars=[edge_dim, "time"]).iloc[rows_to_keep] - - diversion_source = self.dataset["diversion_source"] - if diversion_source is not None: - # Diversion source contains the edge index - # Detect changes of the flow - diversion_ds = self.dataset[["diversion_flow"]] - diversion_ds["time"] = ( - np.searchsorted(diversion_ds["time"], globaltimes) + 1 - ) - diversion_to_keep = _detect_update(diversion_ds, edge_dim) - # Add the edge_index (IFNO), this'll end up as an index - diversion_ds[edge_dim] = diversion_source - diversion_ds["diversion_index"] = np.arange(len(diversion_source)) + 1 - diversion_df = diversion_ds.to_dataframe() - # Concatenate the entries into a single column so it'll match the other period data. - diversion_df["diversion_entry"] = ( - diversion_df["diversion_index"].astype(str) - + " " - + diversion_df["diversion_flow"].astype(str) - ) - diversion_updates = ( - diversion_df[[edge_dim, "time", "diversion_entry"]] - .melt(id_vars=[edge_dim, "time"]) - .iloc[diversion_to_keep] - ) - updates = pd.concat((updates, diversion_updates)) - - return updates - - def render(self, directory, pkgname, globaltimes, binary) -> str: - d = { - "storage": storage, - "maximum_iterations": maximum_iterations, - "maximum_depth_change": maximum_depth_change, - "length_conversion": length_conversion, - "time_conversion": time_conversion, - "print_input": print_input, - "print_flows": print_flows, - "save_flows": save_flows, - "budget_fileout": budget_fileout, - "budgetcsv_fileout": budgetcsv_fileout, - "stage_fileout": stage_fileout, - "observations": observations, - "water_mover": water_mover, - "timeseries": timeseries, - "nreaches": self.dataset.ugrid.grid.n_edge, - } - return self._template.render(d) - - def write_blockfile( - self, pkgname, globaltimes, write_context: WriteContext, idomain - ): - dir_for_render = write_context.get_formatted_write_directory() - content = self.render( - dir_for_render, pkgname, globaltimes, write_context.use_binary + def _initialstages_dataframe(self) -> Union[pd.DataFrame, None]: + stage = self.dataset["stage"] + if stage.isnull(): + return None + + if "time" in stage.dims: + initialstages = stage.isel(time=0) + else: + initialstages = stage + df = initialstages.to_dataframe() + df.index += 1 + return df + + def _perioddata_dataframe(self, globaltimes) -> Union[pd.DataFrame, None]: + if "time" not in self.dataset.dims: + return None + period_ds = self.dataset[self._period_data] + edge_dim = self.dataset.ugrid.grid.edge_dimension + + # These variables are both package data and period data. If they + # aren't transient, the package data entry suffices, and they don't + # need to be written in the period block. + # Also drop optional transient entries. + vars_to_drop = [ + var + for var in ("bedk", "manning", "stage", "upstream_fraction") + if "time" not in self.dataset[var].dims + ] + [var for var in self._period_data if self.dataset[var] is None] + period_ds = period_ds.drop_vars(vars_to_drop) + if len(period_ds.data_vars) == 0: + return None + + # Some variables can only be specified in the period block. + # If all are constant over time, create a one-sized time dimension. + if "time" not in period_ds: + period_ds = period_ds.expand_dims("time") + period_ds["time"] = [1] + else: + period_ds["time"] = np.searchsorted(period_ds["time"], globaltimes) + 1 + + rows_to_keep = _detect_update(period_ds, edge_dim) + period_df = period_ds.to_dataframe() + updates = period_df.melt(id_vars=[edge_dim, "time"]).iloc[rows_to_keep] + + diversion_source = self.dataset["diversion_source"] + if diversion_source is not None: + # Diversion source contains the edge index + # Detect changes of the flow + diversion_ds = self.dataset[["diversion_flow"]] + diversion_ds["time"] = ( + np.searchsorted(diversion_ds["time"], globaltimes) + 1 + ) + diversion_to_keep = _detect_update(diversion_ds, edge_dim) + # Add the edge_index (IFNO), this'll end up as an index + diversion_ds[edge_dim] = diversion_source + diversion_ds["diversion_index"] = np.arange(len(diversion_source)) + 1 + diversion_df = diversion_ds.to_dataframe() + # Concatenate the entries into a single column so it'll match the other period data. + diversion_df["diversion_entry"] = ( + diversion_df["diversion_index"].astype(str) + + " " + + diversion_df["diversion_flow"].astype(str) ) - filename = write_context.write_directory / f"{pkgname}.{self._pkg_id}" + diversion_updates = ( + diversion_df[[edge_dim, "time", "diversion_entry"]] + .melt(id_vars=[edge_dim, "time"]) + .iloc[diversion_to_keep] + ) + updates = pd.concat((updates, diversion_updates)) + + return updates + + def render(self, directory, pkgname, globaltimes, binary) -> str: + d = { + "storage": self.dataset["storage"].item(), + "maximum_iterations": self.dataset["maximum_iterations"].item(), + "maximum_depth_change": self.dataset["maximum_depth_change"].item(), + "length_conversion": self.dataset["length_conversion"].item(), + "time_conversion": 86_400.0, # imod-python always assumes days! + "print_input": self.dataset["print_input"].item(), + "print_flows": self.dataset["print_flows"].item(), + "save_flows": self.dataset["save_flows"].item(), + # TODO: + # "budget_fileout": self.dataset["budget_fileout"].item(), + # "budgetcsv_fileout": self.dataset["budgetcsv_fileout"].item(), + "stage_fileout": self.dataset["stage_fileout"].item(), + "nreaches": self.dataset.ugrid.grid.n_edge, + } + return self._template.render(d) + + def write_blockfile(self, pkgname, globaltimes, write_context: WriteContext): + dir_for_render = write_context.get_formatted_write_directory() + content = self.render( + dir_for_render, pkgname, globaltimes, write_context.use_binary + ) + filename = write_context.write_directory / f"{pkgname}.{self._pkg_id}" + + packagedata_df = self._packagedata_dataframe() + with open(filename, "w") as f: + f.write(content) + f.write("\n\n") + + connectiondata_df = self._connectiondata_dataframe() + crosssection_df = self._crosssections_dataframe() packagedata_df = self._packagedata_dataframe() + diversions_df = self._diversions_dataframe() + initialstages_df = self._initialstages_dataframe() + perioddata_df = self._perioddata_dataframe(globaltimes) - with open(filename, "w") as f: - f.write(content) - f.write("\n\n") - - connectiondata_df = self._connectiondata_dataframe() - crosssection_df = self._crosssections_dataframe() - packagedata_df = self._packagedata_dataframe(idomain) - diversions_df = self._diversions_dataframe() - initialstages_df = self._initialstages_dataframe() - perioddata_df = self._perioddata_dataframe(globaltimes) - - self._write_table_section(f, packagedata_df, title="packagedata") - if crosssection_df is not None: - self._write_table_section(f, crosssection_df, title="crossections") - self._write_table_section(f, connectiondata_df, title="connectiondata") - if diversions_df is not None: - self._write_table_section(f, diversions_df, title="diversions") - if initialstages_df is not None: + self._write_table_section( + f, packagedata_df, title="packagedata", index=True + ) + if crosssection_df is not None: + self._write_table_section(f, crosssection_df, title="crossections") + self._write_table_section( + f, connectiondata_df, title="connectiondata", index=True + ) + if diversions_df is not None: + self._write_table_section(f, diversions_df, title="diversions") + if initialstages_df is not None: + self._write_table_section( + f, initialstages_df, title="initialstages", index=True + ) + if perioddata_df is not None: + # Iterate over the periods + for period_number, period_df in perioddata_df.groupby("time"): self._write_table_section( - f, initialstages_df, title="initialstages" + f, period_df, title=f"period {period_number}" ) - if perioddata_df is not None: - # Iterate over the periods - for period_number, period_df in perioddata_df.groupby("time"): - self._write_table_section( - f, period_df, title=f"period {period_number}" - ) - return + return - # Overloaded, neutralized methods - # TODO: would be more convenient to move these into a single function to overload? + # Overloaded, neutralized methods + # TODO: would be more convenient to move these into a single function to overload? - def _package_data_to_sparse(self): - return + def _package_data_to_sparse(self): + return - def fill_stress_perioddata(self): - # this function is called from packagebase and should do nothing in this context - return + def fill_stress_perioddata(self): + # this function is called from packagebase and should do nothing in this context + return - def _write_perioddata(self, directory, pkgname, binary): - # this function is called from packagebase and should do nothing in this context - return + def _write_perioddata(self, directory, pkgname, binary): + # this function is called from packagebase and should do nothing in this context + return - def is_splitting_supported(self) -> bool: - return False + def is_splitting_supported(self) -> bool: + return False - def is_regridding_supported(self) -> bool: - return False + def is_regridding_supported(self) -> bool: + return False - def is_clipping_supported(self) -> bool: - return False + def is_clipping_supported(self) -> bool: + return False From 4d84fc066450cef6f91d8d293b069362bd56c21a Mon Sep 17 00:00:00 2001 From: Huite Date: Fri, 28 Mar 2025 12:05:15 +0100 Subject: [PATCH 4/4] Basic output stages and budget support (in example) --- examples/mf6/streamflow_routing.py | 164 ++++++++++++++++++++++++++++- imod/mf6/sfr.py | 5 +- imod/templates/mf6/gwf-sfr.j2 | 2 + 3 files changed, 166 insertions(+), 5 deletions(-) diff --git a/examples/mf6/streamflow_routing.py b/examples/mf6/streamflow_routing.py index 16876e529..ff397ea5c 100644 --- a/examples/mf6/streamflow_routing.py +++ b/examples/mf6/streamflow_routing.py @@ -59,6 +59,10 @@ uds["bedk"] = xr.DataArray(np.full(3, 0.1), dims=[edgedim]) uds["manning_n"] = xr.DataArray(np.full(3, 0.03), dims=[edgedim]) uds["layer"] = xr.DataArray(np.full(3, 1), dims=[edgedim]) +uds["save_flows"] = True +uds["stage_fileout"] = "sfr-stage-fileout.bin" +uds["budget_fileout"] = "sfr-budget-fileout.bin" +uds["budgetcsv_fileout"] = "sfr-budget-fileout.csv" sfr = imod.mf6.StreamFlowRouting(**uds) @@ -118,7 +122,7 @@ additional_times=["2000-01-01", "2000-01-02", "2000-01-03", "2000-01-04"] ) -modeldir = Path("examples/mf6/stream-routing") +modeldir = Path("stream-routing") simulation.write(modeldir) # %% simulation.run() @@ -129,8 +133,164 @@ modeldir / "GWF_1/GWF_1.hds", modeldir / "GWF_1/dis.dis.grb", ) +head.isel(time=0, layer=0).plot.contourf() + # %% -head.isel(time=0, layer=0).plot.contourf() +budgets = imod.mf6.open_cbc( + modeldir / "GWF_1/GWF_1.cbc", + modeldir / "GWF_1/dis.dis.grb", + flowja=True, +) +sfr_budgets = budgets["sfr_sfr"].compute() +sfr_budgets.isel(time=0, layer=0).plot() + +# %% + +import os + +import dask + +from imod.mf6.out.common import read_times_dvs + +FloatArray = np.ndarray + + +def read_dvs_timestep(path, n: int, pos: int) -> FloatArray: + """ + Reads all values of one timestep. + """ + with open(path, "rb") as f: + f.seek(pos) + f.seek(52, 1) # skip kstp, kper, pertime + a1d = np.fromfile(f, np.float64, n) + return a1d + + +def open_sfr_stages(path, network): + # TODO: get rid of indices, just propagate size? + indices = np.arange(network.n_edge) + filesize = os.path.getsize(path) + ntime = filesize // (52 + (indices.size * 8)) + coords = {"time": read_times_dvs(path, ntime, indices)} + dask_list = [] + for i in range(ntime): + pos = i * (52 + indices.size * 8) + a = dask.delayed(read_dvs_timestep)(path, network.n_edge, pos) + x = dask.array.from_delayed(a, shape=(network.n_edge,), dtype=np.float64) + dask_list.append(x) + + daskarr = dask.array.stack(dask_list, axis=0) + da = xr.DataArray(daskarr, coords, ("time", network.edge_dimension), name="sfr") + return xu.UgridDataArray(da, network) + + +sfr_stage = open_sfr_stages(modeldir / "sfr-stage-fileout.bin", network) +sfr_stage.isel(time=0).ugrid.plot() + +# %% + + +from scipy.sparse import csr_matrix + +from imod.mf6.out import cbc + + +def read_imeth6_budget_column( + cbc_path, + count, + dtype, + pos, +) -> np.ndarray: + # Wrapped function returning the budget column only. + a = cbc.read_imeth6_budgets(cbc_path, count, dtype, pos) + return a["budget"] + + +def process_sfr_flowja(sfr_flowja, network): + # If I understand correctly, SFR doesn't allow complex junctions. + # This means that a junction is either a confluence (N to 1), + # or a furcation (1 to N). + # In either case, either the flow from upstream or the flow to downstream + # is single valued and can be uniquely identified. + i = sfr_flowja["id1"] + j = sfr_flowja["id2"] + flow = sfr_flowja["flow"] + flow_area = sfr_flowja["flow-area"] + + shape = (network.n_edge, network.n_edge) + connectivity_matrix = csr_matrix((flow, (i, j)), shape=shape) + # Sum along the rows (axis=1, index=i): flow to downstream + # Sum along the columns (axis=0, index=j): flow from upstream + downstream_flow = connectivity_matrix.sum(axis=1) + upstream_flow = connectivity_matrix.sum(axis=0) + connectivity_matrix.data = flow_area + downstream_flow_area = connectivity_matrix.sum(axis=1) + upstream_flow_area = connectivity_matrix.sum(axis=0) + + return downstream_flow, upstream_flow, downstream_flow_area, upstream_flow_area + + +def open_sfr_cbc(cbc_path, network): + headers = cbc.read_cbc_headers(cbc_path) + data = {} + for key, header_list in headers.items(): + if key in ("flow-ja-face_sfr", "gwf_gwf_1", "storage_sfr"): + # TODO: these have multiple columns + # flow-ja-face contains: + # * reach_i + # * reach_j + # * flow + # * flow_area + # + # This should be split into four DataArrays (see above). + # + # gwf_gwf_1 contains: + # * reach_i + # * modflow_gwf_i + # * reach-aquifer flow + # * reach-aquifer flow area + # + # Separate these into two DataArrays. + # + # storage contains: + # * reach_i + # * reach_i (the same) + # * storage + # * volume + # + # Separate these into two DataArrays. + continue + + dtype = np.dtype( + [("id1", np.int32), ("id2", np.int32), ("budget", np.float64)] + + [(name, np.float64) for name in header_list[0].auxtxt] + ) + + dask_list = [] + time = [] + for header in header_list: + a = dask.delayed(read_imeth6_budget_column)( + cbc_path, network.n_edge, dtype, header.pos + ) + x = dask.array.from_delayed(a, shape=(network.n_edge,), dtype=np.float64) + dask_list.append(x) + time.append(header.totim) + + daskarr = dask.array.stack(dask_list, axis=0) + data[key] = xu.UgridDataArray( + xr.DataArray( + daskarr, coords={"time": time}, dims=("time", network.edge_dimension) + ), + network, + ) + + return data + + +sfr_budgets = open_sfr_cbc(modeldir / "sfr-budget-fileout.bin", network) + +# %% +list(sfr_budgets.keys()) # %% diff --git a/imod/mf6/sfr.py b/imod/mf6/sfr.py index d41cd4dbf..f988dd811 100644 --- a/imod/mf6/sfr.py +++ b/imod/mf6/sfr.py @@ -612,9 +612,8 @@ def render(self, directory, pkgname, globaltimes, binary) -> str: "print_input": self.dataset["print_input"].item(), "print_flows": self.dataset["print_flows"].item(), "save_flows": self.dataset["save_flows"].item(), - # TODO: - # "budget_fileout": self.dataset["budget_fileout"].item(), - # "budgetcsv_fileout": self.dataset["budgetcsv_fileout"].item(), + "budget_fileout": self.dataset["budget_fileout"].item(), + "budgetcsv_fileout": self.dataset["budgetcsv_fileout"].item(), "stage_fileout": self.dataset["stage_fileout"].item(), "nreaches": self.dataset.ugrid.grid.n_edge, } diff --git a/imod/templates/mf6/gwf-sfr.j2 b/imod/templates/mf6/gwf-sfr.j2 index cc18f7005..081338614 100644 --- a/imod/templates/mf6/gwf-sfr.j2 +++ b/imod/templates/mf6/gwf-sfr.j2 @@ -9,6 +9,8 @@ begin options print_flows{% endif %} {%- if save_flows is defined %} save_flows{% endif %} +{%- if stage_fileout is defined %} + stage fileout {{stage_fileout}}{% endif %} {%- if budget_fileout is defined %} budget fileout {{budget_fileout}}{% endif %} {%- if budgetcsv_fileout is defined %}