diff --git a/freegs/machine.py b/freegs/machine.py index 3062634..9e34efe 100644 --- a/freegs/machine.py +++ b/freegs/machine.py @@ -858,6 +858,10 @@ def plot(self, axis=None, show=True): """ for label, coil in self.coils: axis = coil.plot(axis=axis, show=False) + + if self.wall is not None: + axis.plot(self.wall.R, self.wall.Z, 'k-') + if show: import matplotlib.pyplot as plt diff --git a/freegs/omasio.py b/freegs/omasio.py new file mode 100644 index 0000000..37fc85d --- /dev/null +++ b/freegs/omasio.py @@ -0,0 +1,258 @@ +# IDS schema view: https://gafusion.github.io/omas/schema.html +from __future__ import annotations + +from typing import Tuple, Type, List, Dict + +import omas +import numpy as np + +from freegs.coil import Coil +from .machine import ShapedCoil, FilamentCoil, Circuit, Wall, Machine + +# OMAS coil types: +# multi-element coil is always translated to FilamentCoil (only support rectangular geometry) +# 'outline', 'rectangle' --> ShapeCoil +# oblique, arcs of circle, thick line and annulus are not supported at the moment +OMAS_COIL_TYPES = {1: 'outline', + 2: 'rectangle', + 3: 'oblique', + 4: 'arcs of circle', + 5: 'annulus', + 6: 'thick line'} + + +def _identify_name(ods: omas.ODS) -> str | None: + """ Identify coil name from OMAS data. + Primary identifier is 'name', secondary is 'identifier'. + Note: Not sure what is an intended difference between 'name' and 'identifier'. + """ + if "name" in ods: + return ods["name"] + elif "identifier" in ods: + return ods["identifier"] + else: + return None + + +def _load_omas_supplies(ods: omas.ODS) -> List[Dict]: + """ Load power supplies names from OMAS data. + :param ods: 'pf_active.supply.:' data""" + # FreeGS does not have PowerSupply class, so we return data as list of dicts + # The voltages can be loaded in the same way as currents to support equilibrium evolution. + power_supplies_names = [{"name": _identify_name(ods[idx]), + "current": _load_omas_current(ods[idx])} for idx in ods] + return power_supplies_names + + +def _identify_geometry_type(ods: omas.ODS) -> str | None: + """ Identify coil geometry type from OMAS data. + :param ods: pf_active.coil.element data + :return: coil geometry type + """ + geometry_type_idx = ods["geometry"]["geometry_type"] if "geometry" in ods else None + geometry_type = OMAS_COIL_TYPES[geometry_type_idx] if geometry_type_idx else None + + if not geometry_type: + if "outline" in ods["geometry"]: + geometry_type = "outline" + elif "rectangle" in ods["geometry"]: + geometry_type = "rectangle" + # The IDS definition of annulus is unclear to me. + # elif "annulus" in ods["geometry"]: + # geometry_type = "annulus" + else: + geometry_type = None + + return geometry_type + + +def _load_omas_current(ods: omas.ODS) -> float: + """ Load current from OMAS data. + + :param ods: any IDS substructure which can contain current structure data""" + # Read current + if "current" in ods: + if len(ods["current"]["data"]) > 1: + print( + f"Warning: multiple circuit currents found. Using first one for time: " + f"{ods['current']['time'][0]}") + circuit_current = ods["current"]["data"][0] + else: + circuit_current = 0.0 + + return circuit_current + + +def _circuit_connection_to_linear(circuit_structure: np.ndarray, n_supplies: int) -> Tuple[Tuple, Tuple]: + # TODO This function does not support non-linear circuits and connections which generate opposite currents. + num_rows, num_cols = circuit_structure.shape + + supply_idx = set() + coil_idx = set() + + # Loop through each node in the circuit + for i in range(num_rows): + # Loop through each supply or coil side connected to the node + node_count = 0 + for j in range(num_cols): + if circuit_structure[i, j] == 1: + # Determine if the connection is to a supply or coil + if j < 2 * n_supplies: + index = j // 2 + supply_idx.add(index) + else: + index = (j - 2 * n_supplies) // 2 + coil_idx.add(index) + node_count += 1 + if node_count > 2: + raise ValueError(f"Non-linear circuit structure. Node {i} has more than 2 connections.") + + return tuple(supply_idx), tuple(coil_idx) + + +def _load_omas_circuit(ods: omas.ODS, coils: List[Tuple[str, Coil]], power_supplies: List[Dict]) -> Tuple[str, Circuit]: + """ Load circuit from OMAS data. + :param ods: 'pf_active.circuit.:' data""" + # Identify circuit name + + # IDS circuit description can be found here: https://gafusion.github.io/omas/schema/schema_pf%20active.html. + + # Get linear circuit (coil and supply) structure + supply_idx, coil_idx = _circuit_connection_to_linear(ods["connections"], len(power_supplies)) + + if len(supply_idx) == 0 or len(coil_idx) == 0: + raise ValueError(f"Invalid circuit structure. No supplies or coils found for circuit {ods}.") + + if len(supply_idx) > 1: + print(f"Warning: multiple supplies found for circuit {ods}.") + + # Construct circuit name. First from circuit name, then from supply name, then from coil names. + circuit_name = _identify_name(ods) + if not circuit_name: + if power_supplies[supply_idx[0]]["name"]: + supply_names = [power_supplies[supply_idx[idx]]["name"] for idx in supply_idx if + power_supplies[supply_idx[idx]]["name"]] + circuit_name = "+".join(supply_names) + elif coils[coil_idx[0]][0]: + coil_names = [coil[0] for coil in coils if coil[0]] + circuit_name = "+".join(coil_names) + else: + raise ValueError(f"Unable to identify circuit name for circuit {ods}.") + + circuit_current = _load_omas_current(ods) + + # Init FreeGS circuit + # TODO: Recognize correctly the multiplier for the circuit current + circuit = Circuit([(coils[idx][0], coils[idx][1], 1.0) for idx in coil_idx], circuit_current) + return circuit_name, circuit + + +def load_omas_circuits(ods: omas.ODS) -> List[Tuple[str, Circuit]] | None: + coils = load_omas_coils(ods) + power_supplies = _load_omas_supplies(ods["pf_active.supply"]) + if "pf_active.circuit" not in ods: + return None + return [_load_omas_circuit(ods["pf_active.circuit"][idx], coils, power_supplies) for idx in + ods["pf_active.circuit"]] + + +def _load_omas_coil(ods: omas.ODS) -> Tuple[str, Coil]: + """ Load coil from OMAS data. + :param ods: 'pf_active.coil.:' data""" + # Identify coil name + coil_name = _identify_name(ods) + + # Read current + coil_current = _load_omas_current(ods) + + # Multicoil or simple coil? + if len(ods["element"]) > 1: + r_filaments = ods["element.:.geometry.rectangle.r"] + z_filaments = ods["element.:.geometry.rectangle.z"] + turns = ods["element.:.turns_with_sign"] + if not np.all(np.abs(turns) == 1): + raise ValueError("Multicoil with non-unit turns is not supported yet.") + + # TODO: check if turns are interpreted correctly + coil = FilamentCoil(r_filaments, z_filaments, current=coil_current, turns=len(r_filaments)) + else: + if not coil_name: + coil_name = _identify_name(ods["element"][0]) + + if not coil_name: + raise ValueError(f"Coil name not identified: \n {ods}") + + geometry_type = _identify_geometry_type(ods["element"][0]) + + # Read turns: + if "turns" in ods["element"][0]: + turns = ods["element"][0]["turns_with_sign"] + else: + turns = 1 + + # Init FreeGS coil + if geometry_type == "outline": + outline_r = ods["element"][0]["geometry"]["outline"]["r"] + outline_z = ods["element"][0]["geometry"]["outline"]["z"] + coil = ShapedCoil(list(zip(outline_r, outline_z)), coil_current, turns) + elif geometry_type == "rectangle": + r_centre = ods["element"][0]["geometry"]["rectangle"]["r"] + z_centre = ods["element"][0]["geometry"]["rectangle"]["z"] + width = ods["element"][0]["geometry"]["rectangle"]["width"] + height = ods["element"][0]["geometry"]["rectangle"]["height"] + coil_r = [r_centre - width / 2, r_centre + width / 2, r_centre + width / 2, r_centre - width / 2] + coil_z = [z_centre - height / 2, z_centre - height / 2, z_centre + height / 2, z_centre + height / 2] + coil = ShapedCoil(list(zip(coil_r, coil_z)), coil_current, turns) + else: + raise ValueError(f"Coil geometry type {geometry_type} not supported yet.") + + if coil_name is None: + raise ValueError(f"Coil name not identified: \n {ods}") + + coil_tuple = (coil_name, coil) + return coil_tuple + + +def load_omas_coils(ods: omas.ODS) -> List[Tuple[str, Coil]]: + coils = [_load_omas_coil(ods["pf_active.coil"][idx]) for idx in ods["pf_active.coil"]] + return coils + + +def _load_omas_limiter(ods: omas.ODS): + # Look for the limiter in the ODS. + + # Try to find limiter description in the OMAS data. + # If not found, raise. + if "limiter" not in ods["wall.description_2d.:"]: + return None + + # Identify the first occurrence of limiter id ods + limiter_id = next((idx for idx in ods["wall.description_2d"] if "limiter" in ods["wall.description_2d"][idx])) + limiter_ods = ods["wall.description_2d"][limiter_id]["limiter"] + + # Currently is supported only continuous limiter + if limiter_ods["type.index"] != 0: + raise ValueError("Only continuous limiter is supported yet.") + + # Create FreeGS wall object + limiter = Wall(limiter_ods["unit.0.outline.r"], limiter_ods["unit.0.outline.z"]) + + return limiter + + +def load_omas_machine(ods: omas.ODS): + """ Load machine from OMAS data. + :param ods: OMAS data""" + + # Load circuits + coils = load_omas_circuits(ods) + if not coils: + coils = load_omas_coils(ods) + + # Load limiter + limiter = _load_omas_limiter(ods) + + # Load machine + machine = Machine(coils, limiter) + + return machine diff --git a/freegs/test_omas.py b/freegs/test_omas.py new file mode 100644 index 0000000..3fde7c4 --- /dev/null +++ b/freegs/test_omas.py @@ -0,0 +1,138 @@ +import matplotlib.pyplot as plt +import numpy as np +import omas +import pytest + +import freegs +from freegs.omasio import load_omas_coils, load_omas_machine, load_omas_circuits + + +# ODS feature with pf_active data +@pytest.fixture +def example_ods(): + ods = omas.ods_sample() + + r_coil = 1.3 + z_coil = -1.5 + dr_coil = 0.1 + dz_coil = 0.1 + n_elements = 9 + + # Add a coil with 9 filaments: + ods["pf_active.coil.+.name"] = "multicoil" + + for i in range(n_elements): + icol = i % 3 + irow = i // 3 + rc = r_coil - 3 * dr_coil + irow * dr_coil + zc = z_coil - 3 * dz_coil + icol * dz_coil + + ods["pf_active.coil.-1.element.+.geometry"]["rectangle"]["r"] = rc + ods["pf_active.coil.-1.element.-1.geometry"]["rectangle"]["z"] = zc + ods["pf_active.coil.-1.element.-1.geometry"]["rectangle"]["width"] = 0.1 + ods["pf_active.coil.-1.element.-1.geometry"]["rectangle"]["height"] = 0.1 + ods["pf_active.coil.-1.element.-1.geometry"]["geometry_type"] = 2 + + # For the consistency with `example_circuit_ods` fixture: + ods["pf_active.coil.-1.element.0.identifier"] = "sampl_circuit" + + return ods + + +@pytest.fixture +def example_circuit_ods(example_ods): + ods = example_ods.copy() + + n_coils = len(ods["pf_active.coil"]) + + # Generate Power Supplies: + for coil_idx in ods["pf_active.coil"]: + coil = ods["pf_active.coil"][coil_idx] + ods["pf_active.supply.+.name"] = "PS_" + coil["element.0.identifier"] + + for coil_idx in ods["pf_active.coil"]: + coil = ods["pf_active.coil"][coil_idx] + # Add a circuit: + ods["pf_active.circuit.+.name"] = "C_" + coil["element.0.identifier"] + + # Connect both poles in a loop + connections = np.zeros((2, 4 * n_coils), dtype=int) + # Connection on two poles + connections[0, 2 * coil_idx] = 1 + connections[0, 2 * coil_idx + 2 * n_coils] = 1 + # Connection of the second two poles + connections[1, 2 * coil_idx + 1] = 1 + connections[1, 2 * coil_idx + 2 * n_coils + 1] = 1 + + ods["pf_active.circuit.-1.connections"] = connections + + return ods + + +def test_omas_coils(example_ods): + coils = load_omas_coils(example_ods) + + coil_names = [coil[0] for coil in coils] + + assert "samp0" in coil_names + assert "samp1" in coil_names + assert "samp2" in coil_names + + +def test_omas_circuits(example_circuit_ods): + circuits = load_omas_circuits(example_circuit_ods) + + circuit_names = [circuit[0] for circuit in circuits] + + assert "C_samp0" in circuit_names + assert "C_samp1" in circuit_names + assert "C_samp2" in circuit_names + + +def test_omas_machine(example_ods): + machine = load_omas_machine(example_ods) + + +def _test_freegs_inverse(machine, ods): + # Inverse problem + eq = freegs.Equilibrium(machine, + Rmin=0.6, Rmax=2.8, + Zmin=-2.0, Zmax=1.8, + nx=65, ny=65) + + ip = ods["equilibrium"]["time_slice"][0]["global_quantities"]["ip"] + p_ax = ods["equilibrium"]["time_slice"][0]["profiles_1d"]["pressure"][0] + btor = ods["equilibrium"]["time_slice"][0]["global_quantities"]["magnetic_axis"]["b_field_tor"] + rax = ods["equilibrium"]["time_slice"][0]["global_quantities"]["magnetic_axis"]["r"] + f0 = btor * rax + + profiles = freegs.jtor.ConstrainPaxisIp(eq, p_ax, ip, f0) + + # Try some circular equilibrium + r_lcfs = ods["equilibrium"]["time_slice"][0]["boundary"]["outline"]["r"] + z_lcfs = ods["equilibrium"]["time_slice"][0]["boundary"]["outline"]["z"] + + isoflux = [(r_lcfs[0], z_lcfs[0], r, z) for r, z in zip(r_lcfs[1:], z_lcfs[1:])] + constraints = freegs.control.constrain(isoflux=isoflux) + + freegs.solve(eq, profiles, constraints) + + # ax = plt.gca() + # ax.set_aspect("equal") + # eq.plot(axis=ax, show=False) + # machine.plot(axis=ax, show=False) + # constraints.plot(axis=ax, show=False) + # plt.show() + + +# Run only on request, since it takes a while to run +@pytest.mark.skip(reason="Run only on request") +def test_omas_test_reconstruction(example_ods): + machine = load_omas_machine(example_ods) + _test_freegs_inverse(machine, example_ods) + + +@pytest.mark.skip(reason="Run only on request") +def test_omas_w_circuits_test_reconstruction(example_circuit_ods): + machine = load_omas_machine(example_circuit_ods) + _test_freegs_inverse(machine, example_circuit_ods) diff --git a/pyproject.toml b/pyproject.toml index beff0ff..3698e6e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,8 @@ dependencies = [ "Shapely>=1.7.1", "importlib-metadata<4.3,>=1.1.0", "freeqdsk>=0.1.0", + "omas>=0.56.0", + ] dynamic = ["version"] diff --git a/requirements.txt b/requirements.txt index fce5dd3..6b94d57 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ h5py>=2.10.0 Shapely>=1.7.1 importlib-metadata<4.3,>=1.1.0 freeqdsk +omas>=0.56.0