Skip to content

Commit

Permalink
Merge pull request #453 from ecmwf-ifs/nabr-pipeline-plan
Browse files Browse the repository at this point in the history
CMake Plan: Pipeline dry-run and transformation-based plan creation
  • Loading branch information
reuterbal authored Dec 12, 2024
2 parents fe6bf33 + 633d038 commit a26cb44
Show file tree
Hide file tree
Showing 8 changed files with 413 additions and 183 deletions.
139 changes: 67 additions & 72 deletions loki/batch/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

from enum import Enum, auto
from os.path import commonpath
from pathlib import Path
from codetiming import Timer
Expand All @@ -21,10 +22,40 @@

from loki.frontend import FP, REGEX, RegexParserClass
from loki.tools import as_tuple, CaseInsensitiveDict, flatten
from loki.logging import info, perf, warning, debug, error

from loki.logging import info, perf, warning, error

__all__ = ['Scheduler']
__all__ = ['ProcessingStrategy', 'Scheduler']


class ProcessingStrategy(Enum):
"""
List of available processing types for :any:`Scheduler.process`
Multiple options exist how the :any:`Scheduler.process` method can
apply a provided :any:`Transformation` or :any:`Pipeline` object to the
items in a :any:`Scheduler` graph. The permissible options and default
values are provided by this class.
"""

SEQUENCE = auto()
"""Sequential processing of transformations
For each transformation in a pipeline, the :any:`Transformation.apply`
method is called for every item in the graph, following the graph traversal
mode specified in the transformation's manifest, before repeating the
same for the next transformation in the pipeline.
"""

PLAN = auto()
"""Planning mode using :any:`ProcessingStrategy.SEQUENCE` strategy.
This calls :any:`Transformation.plan` (instead of :any:`Transformation.apply`)
for each transformation.
"""

DEFAULT = SEQUENCE
"""Default processing strategy, currently :any:`ProcessingStrategy.SEQUENCE`"""


class Scheduler:
Expand Down Expand Up @@ -382,7 +413,7 @@ def rekey_item_cache(self):
if item.name not in deleted_keys
)

def process(self, transformation):
def process(self, transformation, proc_strategy=ProcessingStrategy.DEFAULT):
"""
Process all :attr:`items` in the scheduler's graph with either
a :any:`Pipeline` or a single :any:`Transformation`.
Expand All @@ -396,18 +427,21 @@ def process(self, transformation):
----------
transformation : :any:`Transformation` or :any:`Pipeline`
The transformation or transformation pipeline to apply
proc_strategy : :any:`ProcessingStrategy`
The processing strategy to use when applying the given
:data:`transformation` to the scheduler's graph.
"""
if isinstance(transformation, Transformation):
self.process_transformation(transformation=transformation)
self.process_transformation(transformation=transformation, proc_strategy=proc_strategy)

elif isinstance(transformation, Pipeline):
self.process_pipeline(pipeline=transformation)
self.process_pipeline(pipeline=transformation, proc_strategy=proc_strategy)

else:
error('[Loki::Scheduler] Batch processing requires Transformation or Pipeline object')
raise RuntimeError('[Loki] Could not batch process {transformation_or_pipeline}')
raise RuntimeError(f'Could not batch process {transformation}')

def process_pipeline(self, pipeline):
def process_pipeline(self, pipeline, proc_strategy=ProcessingStrategy.DEFAULT):
"""
Process a given :any:`Pipeline` by applying its assocaited
transformations in turn.
Expand All @@ -416,11 +450,14 @@ def process_pipeline(self, pipeline):
----------
transformation : :any:`Pipeline`
The transformation pipeline to apply
proc_strategy : :any:`ProcessingStrategy`
The processing strategy to use when applying the given
:data:`pipeline` to the scheduler's graph.
"""
for transformation in pipeline.transformations:
self.process_transformation(transformation)
self.process_transformation(transformation, proc_strategy=proc_strategy)

def process_transformation(self, transformation):
def process_transformation(self, transformation, proc_strategy=ProcessingStrategy.DEFAULT):
"""
Process all :attr:`items` in the scheduler's graph
Expand All @@ -445,6 +482,9 @@ def process_transformation(self, transformation):
----------
transformation : :any:`Transformation`
The transformation to apply over the dependency tree
proc_strategy : :any:`ProcessingStrategy`
The processing strategy to use when applying the given
:data:`transformation` to the scheduler's graph.
"""
def _get_definition_items(_item, sgraph_items):
# For backward-compatibility with the DependencyTransform and LinterTransformation
Expand All @@ -464,6 +504,10 @@ def _get_definition_items(_item, sgraph_items):
items += (item,) + child_items
return items

if proc_strategy not in (ProcessingStrategy.SEQUENCE, ProcessingStrategy.PLAN):
error(f'[Loki::Scheduler] Processing {proc_strategy} is not implemented!')
raise RuntimeError(f'Could not batch process {transformation}')

trafo_name = transformation.__class__.__name__
log = f'[Loki::Scheduler] Applied transformation <{trafo_name}>' + ' in {:.2f}s'
with Timer(logger=info, text=log):
Expand Down Expand Up @@ -498,7 +542,8 @@ def _get_definition_items(_item, sgraph_items):
_item.scope_ir, role=_item.role, mode=_item.mode,
item=_item, targets=_item.targets, items=_get_definition_items(_item, sgraph_items),
successors=graph.successors(_item, item_filter=item_filter),
depths=graph.depths, build_args=self.build_args
depths=graph.depths, build_args=self.build_args,
plan_mode=proc_strategy == ProcessingStrategy.PLAN
)

if transformation.renames_items:
Expand Down Expand Up @@ -609,72 +654,22 @@ def callgraph(self, path, with_file_graph=False, with_legend=False):
warning(f'[Loki] Failed to render filegraph due to graphviz error:\n {e}')

@Timer(logger=perf, text='[Loki::Scheduler] Wrote CMake plan file in {:.2f}s')
def write_cmake_plan(self, filepath, mode, buildpath, rootpath):
def write_cmake_plan(self, filepath, rootpath=None):
"""
Generate the "plan file" for CMake
The plan file is a CMake file defining three lists:
* ``LOKI_SOURCES_TO_TRANSFORM``: The list of files that are
processed in the dependency graph
* ``LOKI_SOURCES_TO_APPEND``: The list of files that are created
and have to be added to the build target as part of the processing
* ``LOKI_SOURCES_TO_REMOVE``: The list of files that are no longer
required (because they have been replaced by transformed files) and
should be removed from the build target.
See :any:`CMakePlanTransformation` for the specification of that file.
These lists are used by the CMake wrappers to schedule the source
updates and update the source lists of the CMake target object accordingly.
Parameters
----------
filepath : str or Path
The path of the CMake file to write.
rootpath : str or Path (optional)
If given, all paths in the CMake file will be made relative to this root directory
"""
info(f'[Loki] Scheduler writing CMake plan: {filepath}')

rootpath = None if rootpath is None else Path(rootpath).resolve()
buildpath = None if buildpath is None else Path(buildpath)
sources_to_append = []
sources_to_remove = []
sources_to_transform = []

# Filter the SGraph to get a pure call-tree
item_filter = ProcedureItem
if self.config.enable_imports:
item_filter = as_tuple(item_filter) + (ModuleItem,)
graph = self.sgraph.as_filegraph(
self.item_factory, self.config, item_filter=item_filter,
exclude_ignored=True
)
traversal = SFilter(graph, reverse=False, include_external=False)
for item in traversal:
if item.is_ignored:
continue

sourcepath = item.path.resolve()
newsource = sourcepath.with_suffix(f'.{mode.lower()}.F90')
if buildpath:
newsource = buildpath/newsource.name

# Make new CMake paths relative to source again
if rootpath is not None:
sourcepath = sourcepath.relative_to(rootpath)

debug(f'Planning:: {item.name} (role={item.role}, mode={mode})')

# Inject new object into the final binary libs
if newsource not in sources_to_append:
sources_to_transform += [sourcepath]
if item.replicate:
# Add new source file next to the old one
sources_to_append += [newsource]
else:
# Replace old source file to avoid ghosting
sources_to_append += [newsource]
sources_to_remove += [sourcepath]

with Path(filepath).open('w') as f:
s_transform = '\n'.join(f' {s}' for s in sources_to_transform)
f.write(f'set( LOKI_SOURCES_TO_TRANSFORM \n{s_transform}\n )\n')

s_append = '\n'.join(f' {s}' for s in sources_to_append)
f.write(f'set( LOKI_SOURCES_TO_APPEND \n{s_append}\n )\n')

s_remove = '\n'.join(f' {s}' for s in sources_to_remove)
f.write(f'set( LOKI_SOURCES_TO_REMOVE \n{s_remove}\n )\n')
from loki.transformations.build_system.plan import CMakePlanTransformation # pylint: disable=import-outside-toplevel
planner = CMakePlanTransformation(rootpath=rootpath)
self.process(planner, proc_strategy=ProcessingStrategy.PLAN)
planner.write_plan(filepath)
37 changes: 20 additions & 17 deletions loki/batch/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from loki.batch import (
Scheduler, SchedulerConfig, Item, ProcedureItem,
ProcedureBindingItem, InterfaceItem, TypeDefItem, SFilter,
ExternalItem, Transformation, Pipeline
ExternalItem, Transformation, Pipeline, ProcessingStrategy
)
from loki.expression import Scalar, Array, Literal, ProcedureSymbol
from loki.frontend import (
Expand All @@ -74,7 +74,7 @@
nodes as ir, FindNodes, FindInlineCalls, FindVariables
)
from loki.transformations import (
DependencyTransformation, ModuleWrapTransformation
DependencyTransformation, ModuleWrapTransformation, FileWriteTransformation
)


Expand Down Expand Up @@ -1001,10 +1001,13 @@ def test_scheduler_missing_files(testdir, config, frontend, strict, tmp_path):
# Check processing with missing items
class CheckApply(Transformation):

def apply(self, source, post_apply_rescope_symbols=False, **kwargs):
def apply(self, source, post_apply_rescope_symbols=False, plan_mode=False, **kwargs):
assert 'item' in kwargs
assert not isinstance(kwargs['item'], ExternalItem)
super().apply(source, post_apply_rescope_symbols=post_apply_rescope_symbols, **kwargs)
super().apply(
source, post_apply_rescope_symbols=post_apply_rescope_symbols,
plan_mode=plan_mode, **kwargs
)

if strict:
with pytest.raises(RuntimeError):
Expand Down Expand Up @@ -1156,35 +1159,35 @@ def test_scheduler_cmake_planner(tmp_path, testdir, frontend):
proj_b = sourcedir/'projB'

config = SchedulerConfig.from_dict({
'default': {'role': 'kernel', 'expand': True, 'strict': True, 'ignore': ('header_mod',)},
'default': {
'role': 'kernel',
'expand': True,
'strict': True,
'ignore': ('header_mod',),
'mode': 'foobar'
},
'routines': {
'driverB': {'role': 'driver'},
'kernelB': {'ignore': ['ext_driver']},
}
})
builddir = tmp_path/'scheduler_cmake_planner_dummy_dir'
builddir.mkdir(exist_ok=True)

# Populate the scheduler
# (this is the same as SchedulerA in test_scheduler_dependencies_ignore, so no need to
# check scheduler set-up itself)
scheduler = Scheduler(
paths=[proj_a, proj_b], includes=proj_a/'include',
config=config, frontend=frontend, xmods=[tmp_path]
config=config, frontend=frontend, xmods=[tmp_path],
output_dir=builddir
)

# Apply the transformation
builddir = tmp_path/'scheduler_cmake_planner_dummy_dir'
builddir.mkdir(exist_ok=True)
planfile = builddir/'loki_plan.cmake'

scheduler.write_cmake_plan(
filepath=planfile, mode='foobar', buildpath=builddir, rootpath=sourcedir
)

# Validate the generated lists
expected_files = {
proj_a/'module/driverB_mod.f90', proj_a/'module/kernelB_mod.F90',
proj_a/'module/compute_l1_mod.f90', proj_a/'module/compute_l2_mod.f90'
}
scheduler.process(FileWriteTransformation(), proc_strategy=ProcessingStrategy.PLAN)
scheduler.write_cmake_plan(filepath=planfile, rootpath=sourcedir)

# Validate the plan file content
plan_pattern = re.compile(r'set\(\s*(\w+)\s*(.*?)\s*\)', re.DOTALL)
Expand Down
Loading

0 comments on commit a26cb44

Please sign in to comment.