Skip to content

Commit

Permalink
Generalize plan mode to processing strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
reuterbal committed Dec 10, 2024
1 parent ad75311 commit b49bba0
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 63 deletions.
86 changes: 63 additions & 23 deletions loki/batch/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

from enum import Enum, auto
from os.path import commonpath
from pathlib import Path
from codetiming import Timer
Expand All @@ -21,10 +22,40 @@

from loki.frontend import FP, REGEX, RegexParserClass
from loki.tools import as_tuple, CaseInsensitiveDict, flatten
from loki.logging import info, perf, warning, debug, error

from loki.logging import info, perf, warning, error

__all__ = ['Scheduler']
__all__ = ['ProcessingStrategy', 'Scheduler']


class ProcessingStrategy(Enum):
"""
List of available processing types for :any:`Scheduler.process`
Multiple options exist how the :any:`Scheduler.process` method can
apply a provided :any:`Transformation` or :any:`Pipeline` object to the
items in a :any:`Scheduler` graph. The permissible options and default
values are provided by this class.
"""

SEQUENCE = auto()
"""Sequential processing of transformations
For each transformation in a pipeline, the :any:`Transformation.apply`
method is called for every item in the graph, following the graph traversal
mode specified in the transformation's manifest, before repeating the
same for the next transformation in the pipeline.
"""

PLAN = auto()
"""Planning mode using :any:`ProcessingStrategy.SEQUENCE` strategy.
This calls :any:`Transformation.plan` (instead of :any:`Transformation.apply`)
for each transformation.
"""

DEFAULT = SEQUENCE
"""Default processing strategy, currently :any:`ProcessingStrategy.SEQUENCE`"""


class Scheduler:
Expand Down Expand Up @@ -382,7 +413,7 @@ def rekey_item_cache(self):
if item.name not in deleted_keys
)

def process(self, transformation, plan=False):
def process(self, transformation, proc_strategy=ProcessingStrategy.DEFAULT):
"""
Process all :attr:`items` in the scheduler's graph with either
a :any:`Pipeline` or a single :any:`Transformation`.
Expand All @@ -396,18 +427,21 @@ def process(self, transformation, plan=False):
----------
transformation : :any:`Transformation` or :any:`Pipeline`
The transformation or transformation pipeline to apply
proc_strategy : :any:`ProcessingStrategy`
The processing strategy to use when applying the given
:data:`transformation` to the scheduler's graph.
"""
if isinstance(transformation, Transformation):
self.process_transformation(transformation=transformation, plan=plan)
self.process_transformation(transformation=transformation, proc_strategy=proc_strategy)

elif isinstance(transformation, Pipeline):
self.process_pipeline(pipeline=transformation, plan=plan)
self.process_pipeline(pipeline=transformation, proc_strategy=proc_strategy)

else:
error('[Loki::Scheduler] Batch processing requires Transformation or Pipeline object')
raise RuntimeError('[Loki] Could not batch process {transformation_or_pipeline}')
raise RuntimeError(f'Could not batch process {transformation}')

def process_pipeline(self, pipeline, plan=False):
def process_pipeline(self, pipeline, proc_strategy=ProcessingStrategy.DEFAULT):
"""
Process a given :any:`Pipeline` by applying its assocaited
transformations in turn.
Expand All @@ -416,11 +450,14 @@ def process_pipeline(self, pipeline, plan=False):
----------
transformation : :any:`Pipeline`
The transformation pipeline to apply
proc_strategy : :any:`ProcessingStrategy`
The processing strategy to use when applying the given
:data:`pipeline` to the scheduler's graph.
"""
for transformation in pipeline.transformations:
self.process_transformation(transformation, plan=plan)
self.process_transformation(transformation, process_type=proc_strategy)

Check failure on line 458 in loki/batch/scheduler.py

View workflow job for this annotation

GitHub Actions / code checks (3.11)

E1123: Unexpected keyword argument 'process_type' in method call (unexpected-keyword-arg)

def process_transformation(self, transformation, plan=False):
def process_transformation(self, transformation, proc_strategy=ProcessingStrategy.DEFAULT):
"""
Process all :attr:`items` in the scheduler's graph
Expand All @@ -445,6 +482,9 @@ def process_transformation(self, transformation, plan=False):
----------
transformation : :any:`Transformation`
The transformation to apply over the dependency tree
proc_strategy : :any:`ProcessingStrategy`
The processing strategy to use when applying the given
:data:`transformation` to the scheduler's graph.
"""
def _get_definition_items(_item, sgraph_items):
# For backward-compatibility with the DependencyTransform and LinterTransformation
Expand All @@ -464,6 +504,10 @@ def _get_definition_items(_item, sgraph_items):
items += (item,) + child_items
return items

if proc_strategy not in (ProcessingStrategy.SEQUENCE, ProcessingStrategy.PLAN):
error(f'[Loki::Scheduler] Processing {proc_strategy} is not implemented!')
raise RuntimeError(f'Could not batch process {transformation}')

trafo_name = transformation.__class__.__name__
log = f'[Loki::Scheduler] Applied transformation <{trafo_name}>' + ' in {:.2f}s'
with Timer(logger=info, text=log):
Expand Down Expand Up @@ -499,7 +543,7 @@ def _get_definition_items(_item, sgraph_items):
item=_item, targets=_item.targets, items=_get_definition_items(_item, sgraph_items),
successors=graph.successors(_item, item_filter=item_filter),
depths=graph.depths, build_args=self.build_args,
plan=plan
plan_mode=proc_strategy == ProcessingStrategy.PLAN
)

if transformation.renames_items:
Expand Down Expand Up @@ -614,22 +658,18 @@ def write_cmake_plan(self, filepath, rootpath=None):
"""
Generate the "plan file" for CMake
The plan file is a CMake file defining three lists:
* ``LOKI_SOURCES_TO_TRANSFORM``: The list of files that are
processed in the dependency graph
* ``LOKI_SOURCES_TO_APPEND``: The list of files that are created
and have to be added to the build target as part of the processing
* ``LOKI_SOURCES_TO_REMOVE``: The list of files that are no longer
required (because they have been replaced by transformed files) and
should be removed from the build target.
See :any:`CMakePlanTransformation` for the specification of that file.
These lists are used by the CMake wrappers to schedule the source
updates and update the source lists of the CMake target object accordingly.
Parameters
----------
filepath : str or Path
The path of the CMake file to write.
rootpath : str or Path (optional)
If given, all paths in the CMake file will be made relative to this root directory
"""
info(f'[Loki] Scheduler writing CMake plan: {filepath}')

from loki.transformations.build_system.plan import CMakePlanTransformation
from loki.transformations.build_system.plan import CMakePlanTransformation # pylint: disable=import-outside-toplevel
planner = CMakePlanTransformation(rootpath=rootpath)
self.process(planner, plan=True)
self.process(planner, plan_mode=True)

Check failure on line 674 in loki/batch/scheduler.py

View workflow job for this annotation

GitHub Actions / code checks (3.11)

E1123: Unexpected keyword argument 'plan_mode' in method call (unexpected-keyword-arg)
planner.write_plan(filepath)
4 changes: 2 additions & 2 deletions loki/batch/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from loki.batch import (
Scheduler, SchedulerConfig, Item, ProcedureItem,
ProcedureBindingItem, InterfaceItem, TypeDefItem, SFilter,
ExternalItem, Transformation, Pipeline
ExternalItem, Transformation, Pipeline, ProcessingStrategy
)
from loki.expression import Scalar, Array, Literal, ProcedureSymbol
from loki.frontend import (
Expand Down Expand Up @@ -1183,7 +1183,7 @@ def test_scheduler_cmake_planner(tmp_path, testdir, frontend):
# Apply the transformation
planfile = builddir/'loki_plan.cmake'

scheduler.process(FileWriteTransformation(), plan=True)
scheduler.process(FileWriteTransformation(), proc_strategy=ProcessingStrategy.PLAN)
scheduler.write_cmake_plan(filepath=planfile, rootpath=sourcedir)

# Validate the plan file content
Expand Down
Loading

0 comments on commit b49bba0

Please sign in to comment.