Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get global ocean resources and time steps based on mesh size #631

Merged
merged 6 commits into from
May 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 98 additions & 40 deletions compass/ocean/tests/global_ocean/forward.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import time
from importlib.resources import contents

from compass.model import run_model
from compass.ocean.tests.global_ocean.metadata import (
add_mesh_and_init_metadata,
)
from compass.ocean.tests.global_ocean.tasks import get_ntasks_from_cell_count
from compass.step import Step
from compass.testcase import TestCase

Expand All @@ -25,13 +27,18 @@ class ForwardStep(Step):
time_integrator : {'split_explicit', 'RK4'}
The time integrator to use for the forward run

resources_from_config : bool
Whether to get ``ntasks``, ``min_tasks`` and ``openmp_threads`` from
config options
dynamic_ntasks : bool
Whether the target and minimum number of MPI tasks (``ntasks`` and
``min_tasks``) are computed dynamically from the number of cells
in the mesh

get_dt_from_min_res : bool
Whether to automatically compute `config_dt` and `config_btr_dt`
namelist options from the minimum resolution of the mesh
"""
def __init__(self, test_case, mesh, init, time_integrator, name='forward',
subdir=None, ntasks=None, min_tasks=None,
openmp_threads=None):
def __init__(self, test_case, mesh, time_integrator, init=None,
name='forward', subdir=None, ntasks=None, min_tasks=None,
openmp_threads=None, get_dt_from_min_res=True):
"""
Create a new step

Expand All @@ -43,12 +50,12 @@ def __init__(self, test_case, mesh, init, time_integrator, name='forward',
mesh : compass.ocean.tests.global_ocean.mesh.Mesh
The test case that produces the mesh for this run

init : compass.ocean.tests.global_ocean.init.Init
The test case that produces the initial condition for this run

time_integrator : {'split_explicit', 'RK4'}
The time integrator to use for the forward run

init : compass.ocean.tests.global_ocean.init.Init, optional
The test case that produces the initial condition for this run

name : str, optional
the name of the step

Expand All @@ -66,6 +73,10 @@ def __init__(self, test_case, mesh, init, time_integrator, name='forward',

openmp_threads : int, optional
the number of OpenMP threads the step will use

get_dt_from_min_res : bool
Whether to automatically compute `config_dt` and `config_btr_dt`
namelist options from the minimum resolution of the mesh
"""
self.mesh = mesh
self.init = init
Expand All @@ -79,7 +90,9 @@ def __init__(self, test_case, mesh, init, time_integrator, name='forward',
if (ntasks is None) != (openmp_threads is None):
raise ValueError('You must specify both ntasks and openmp_threads '
'or neither.')
self.resources_from_config = ntasks is None

self.dynamic_ntasks = (ntasks is None and min_tasks is None)
self.get_dt_from_min_res = get_dt_from_min_res

# make sure output is double precision
self.add_streams_file('compass.ocean.streams', 'streams.output')
Expand All @@ -93,7 +106,7 @@ def __init__(self, test_case, mesh, init, time_integrator, name='forward',
self.add_namelist_file(
'compass.ocean.tests.global_ocean', 'namelist.wisc')

if init.with_bgc:
if init is not None and init.with_bgc:
self.add_namelist_file(
'compass.ocean.tests.global_ocean', 'namelist.bgc')
self.add_streams_file(
Expand All @@ -113,38 +126,75 @@ def __init__(self, test_case, mesh, init, time_integrator, name='forward',
if mesh_stream in mesh_package_contents:
self.add_streams_file(mesh_package, mesh_stream)

if mesh.with_ice_shelf_cavities:
initial_state_target = \
f'{init.path}/ssh_adjustment/adjusted_init.nc'
else:
initial_state_target = \
f'{init.path}/initial_state/initial_state.nc'
self.add_input_file(filename='init.nc',
work_dir_target=initial_state_target)
self.add_input_file(
filename='forcing_data.nc',
work_dir_target=f'{init.path}/initial_state/'
f'init_mode_forcing_data.nc')
self.add_input_file(
filename='graph.info',
work_dir_target=f'{init.path}/initial_state/graph.info')
if init is not None:
if mesh.with_ice_shelf_cavities:
initial_state_target = \
f'{init.path}/ssh_adjustment/adjusted_init.nc'
else:
initial_state_target = \
f'{init.path}/initial_state/initial_state.nc'
self.add_input_file(filename='init.nc',
work_dir_target=initial_state_target)
self.add_input_file(
filename='forcing_data.nc',
work_dir_target=f'{init.path}/initial_state/'
f'init_mode_forcing_data.nc')
self.add_input_file(
filename='graph.info',
work_dir_target=f'{init.path}/initial_state/graph.info')

self.add_model_as_input()

def setup(self):
"""
Set up the test case in the work directory, including downloading any
dependencies
Set the number of MPI tasks and the time step based on config options
"""
self._get_resources()
config = self.config
self.dynamic_ntasks = (self.ntasks is None and self.min_tasks is None)

if self.dynamic_ntasks:
mesh_filename = os.path.join(self.work_dir, 'init.nc')
self.ntasks, self.min_tasks = get_ntasks_from_cell_count(
config=config, at_setup=True, mesh_filename=mesh_filename)
self.openmp_threads = config.getint('global_ocean',
'forward_threads')

if self.get_dt_from_min_res:
dt, btr_dt = self._get_dts()
if self.time_integrator == 'split_explicit':
self.add_namelist_options({'config_dt': dt,
'config_btr_dt': btr_dt})
else:
# RK4, so use the smaller time step
self.add_namelist_options({'config_dt': btr_dt})

def constrain_resources(self, available_resources):
"""
Update resources at runtime from config options
"""
self._get_resources()
config = self.config
if self.dynamic_ntasks:
mesh_filename = os.path.join(self.work_dir, 'init.nc')
self.ntasks, self.min_tasks = get_ntasks_from_cell_count(
config=config, at_setup=False, mesh_filename=mesh_filename)
self.openmp_threads = config.getint('global_ocean',
'forward_threads')
super().constrain_resources(available_resources)

def runtime_setup(self):
"""
Update the time step based on config options that a user might have
changed
"""
if self.get_dt_from_min_res:
dt, btr_dt = self._get_dts()
if self.time_integrator == 'split_explicit':
self.update_namelist_at_runtime({'config_dt': dt,
'config_btr_dt': btr_dt})
else:
# RK4, so use the smaller time step
self.update_namelist_at_runtime({'config_dt': btr_dt})

def run(self):
"""
Run this step of the testcase
Expand All @@ -153,16 +203,24 @@ def run(self):
add_mesh_and_init_metadata(self.outputs, self.config,
init_filename='init.nc')

def _get_resources(self):
# get the these properties from the config options
if self.resources_from_config:
config = self.config
self.ntasks = config.getint(
'global_ocean', 'forward_ntasks')
self.min_tasks = config.getint(
'global_ocean', 'forward_min_tasks')
self.openmp_threads = config.getint(
'global_ocean', 'forward_threads')
def _get_dts(self):
"""
Get the time step and barotropic time steps
"""
config = self.config
# dt is proportional to resolution: default 30 seconds per km
dt_per_km = config.getfloat('global_ocean', 'dt_per_km')
btr_dt_per_km = config.getfloat('global_ocean', 'btr_dt_per_km')
min_res = config.getfloat('global_ocean', 'min_res')

dt = dt_per_km * min_res
# https://stackoverflow.com/a/1384565/7728169
dt = time.strftime('%H:%M:%S', time.gmtime(dt))

btr_dt = btr_dt_per_km * min_res
btr_dt = time.strftime('%H:%M:%S', time.gmtime(btr_dt))

return dt, btr_dt


class ForwardTestCase(TestCase):
Expand Down
26 changes: 16 additions & 10 deletions compass/ocean/tests/global_ocean/global_ocean.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,31 @@ cull_mesh_max_memory = 1000

## each mesh should replace these with appropriate values in its config file

# the number of cells per core to aim for
goal_cells_per_core = 200
# the approximate maximum number of cells per core (the test will fail if too
# few cores are available)
max_cells_per_core = 2000

# the approximate number of cells in the mesh, to be estimated for each mesh
approx_cell_count = <<<Missing>>>

# time step per resolution (s/km), since dt is proportional to resolution
dt_per_km = 30
# barotropic time step per resolution (s/km)
btr_dt_per_km = 1.5

## config options related to the initial_state step
# number of cores to use
init_ntasks = 4
init_ntasks = 36
# minimum of cores, below which the step fails
init_min_tasks = 1
# maximum memory usage allowed (in MB)
init_max_memory = 1000
init_min_tasks = 8
# number of threads
init_threads = 1

## config options related to the forward steps
# number of cores to use
forward_ntasks = 4
# minimum of cores, below which the step fails
forward_min_tasks = 1
# number of threads
forward_threads = 1
# maximum memory usage allowed (in MB)
forward_max_memory = 1000

## metadata related to the mesh
# whether to add metadata to output files
Expand Down
47 changes: 5 additions & 42 deletions compass/ocean/tests/global_ocean/init/ssh_adjustment.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from importlib.resources import contents

from compass.ocean.iceshelf import adjust_ssh
from compass.step import Step
from compass.ocean.tests.global_ocean.forward import ForwardStep


class SshAdjustment(Step):
class SshAdjustment(ForwardStep):
"""
A step for iteratively adjusting the pressure from the weight of the ice
shelf to match the sea-surface height as part of ice-shelf 2D test cases
Expand All @@ -18,21 +16,9 @@ def __init__(self, test_case):
test_case : compass.ocean.tests.global_ocean.init.Init
The test case this step belongs to
"""
super().__init__(test_case=test_case, name='ssh_adjustment')

# make sure output is double precision
self.add_streams_file('compass.ocean.streams', 'streams.output')

self.add_namelist_file(
'compass.ocean.tests.global_ocean', 'namelist.forward')

mesh_package = test_case.mesh.package
mesh_package_contents = list(contents(mesh_package))
mesh_namelists = ['namelist.forward',
'namelist.split_explicit']
for mesh_namelist in mesh_namelists:
if mesh_namelist in mesh_package_contents:
self.add_namelist_file(mesh_package, mesh_namelist)
super().__init__(test_case=test_case, mesh=test_case.mesh,
time_integrator='split_explicit',
name='ssh_adjustment')

self.add_namelist_options({'config_AM_globalStats_enable': '.false.'})
self.add_namelist_file('compass.ocean.namelists',
Expand All @@ -55,24 +41,8 @@ def __init__(self, test_case):
filename='graph.info',
work_dir_target=f'{mesh_path}/culled_graph.info')

self.add_model_as_input()

self.add_output_file(filename='adjusted_init.nc')

def setup(self):
"""
Set up the test case in the work directory, including downloading any
dependencies
"""
self._get_resources()

def constrain_resources(self, available_resources):
"""
Update resources at runtime from config options
"""
self._get_resources()
super().constrain_resources(available_resources)

def run(self):
"""
Run this step of the testcase
Expand All @@ -81,10 +51,3 @@ def run(self):
iteration_count = config.getint('ssh_adjustment', 'iterations')
adjust_ssh(variable='landIcePressure', iteration_count=iteration_count,
step=self)

def _get_resources(self):
# get the these properties from the config options
config = self.config
self.ntasks = config.getint('global_ocean', 'forward_ntasks')
self.min_tasks = config.getint('global_ocean', 'forward_min_tasks')
self.openmp_threads = config.getint('global_ocean', 'forward_threads')
11 changes: 2 additions & 9 deletions compass/ocean/tests/global_ocean/mesh/arrm10to60/arrm10to60.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,9 @@ max_layer_thickness = 150.0
init_ntasks = 256
# minimum of cores, below which the step fails
init_min_tasks = 64
# maximum memory usage allowed (in MB)
init_max_memory = 1000

## config options related to the forward steps
# number of cores to use
forward_ntasks = 2000
# minimum of cores, below which the step fails
forward_min_tasks = 200
# maximum memory usage allowed (in MB)
forward_max_memory = 1000
# the approximate number of cells in the mesh
approx_cell_count = 600000

## metadata related to the mesh
# the prefix (e.g. QU, EC, WC, SO)
Expand Down
Loading