Skip to content

Commit

Permalink
Added howto notebook
Browse files Browse the repository at this point in the history
  • Loading branch information
cpelley committed Oct 25, 2024
1 parent fe52fb2 commit ebadf31
Show file tree
Hide file tree
Showing 4 changed files with 629 additions and 234 deletions.
234 changes: 7 additions & 227 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,242 +43,22 @@ usage: dagrunner-execute-graph [-h] [--scheduler SCHEDULER] [--num-workers NUM_W
```
see `dagrunner-execute-graph --help` for more information.

## Example library usage
## Example DAGRunner usage

Let's demonstrate defining a graph, where in our simple example we define each node with an associated ID. Our task is then to concatenate this node ID as a string with its dependencies.

This demonstrates:
- Defining a custom processing module (i.e. plugin).
- Basic graph generation.
- Passing data in memory.
- Execution with our chosen scheduler.

Our networkx graph is constructed from a list of edges (see official [edge](https://networkx.org/documentation/stable/reference/glossary.html#term-edge) definition) and settings. The former defines the connection between 'nodes' (see official [node](https://networkx.org/documentation/stable/reference/glossary.html#term-node) definition), while the later defines a lookup between node and the nodes attributes (see official [node attributes](https://networkx.org/documentation/stable/reference/glossary.html#term-node-attribute) definition). It is the nodes attributes that instruct DAGRunner on how to execute that given 'node'. In short, these attributes are passed directly to DAGRunner's [plugin-executor](https://github.com/MetOffice/dagrunner/blob/main/docs/dagrunner.execute_graph.md#function-plugin_executor) function (by default, see [Customise node execution](#customise-node-execution)). This takes a 'call' argument what plugin to be called along with any keyword arguments.

### Defining a custom processing module (plugin)

First, ensure that 'dagrunner' is on the `PYTHONPATH` (i.e. [installation](#installation)).

Now let's subclass the abstract [Plugin](docs/dagrunner.plugin_framework.md#class-plugin) class from `dagrunner` and define a processing module that accepts an 'id' as argument and concatenates this together with the node ID (result returned) of dependent (predecessor) nodes.

```python
from dagrunner.plugin_framework import Plugin

class ProcessID(Plugin):
"""Concatenate node id together"""
def __call__(self, *args, id=None, verbose=False):
concat_arg_id = str(id)
if args and args[0]:
concat_arg_id = '_'.join([str(arg) for arg in args if arg]) + f"_{id}"
return concat_arg_id
```

### Define our graph 'nodes' container

Our node could represent any range of properties but for the purpose of this example we will define only 'step' and 'leadtime' properties.
Also we could define any object to represent a 'node' but commonly used objects for this purpose include [dataclasses](https://docs.python.org/3/library/dataclasses.html#dataclasses.dataclass) and [namedtuples](https://docs.python.org/3/library/collections.html#collections.namedtuple).
For the sake of graph visualisation, we will define a `__str__` special method too.

```python
from dataclasses import dataclass

@dataclass(order=True, eq=True, frozen=True)
class Node:
step: str = None
leadtime: int = None

def __str__(self):
return f"S:{self.step}_L:{self.leadtime}"
```

### Define the graph itself (configuration/recipe depending on preferred terminology)

Let's define a graph with two independent branches. One for the earlier leadtime and a second for the later leadtime.
```python
HOUR = 3600
MINUTE = 60
leadtimes = tuple(range(0, HOUR*2, HOUR))

SETTINGS = {}
EDGES = []

for leadtime in leadtimes:
# node1 -> node2
node1 = Node(step="step1", leadtime=leadtime)
node2 = Node(step="step2", leadtime=leadtime)
EDGES.append((node1, node2))

# node3 -> node4
node3 = Node(step="step3", leadtime=leadtime)
node4 = Node(step="step4", leadtime=leadtime)
EDGES.append((node3, node4))

# node2 -> node5
node5 = Node(step="step5", leadtime=leadtime)
EDGES.append((node2, node5))

# node4 -> node5
node4 = Node(step="step4", leadtime=leadtime)
EDGES.append((node4, node5))

for nodenum in range(1, 6):
node = vars()[f"node{nodenum}"]
SETTINGS[node] = {
'call': (ProcessID, {"id": nodenum}),
}
```
We see that the processing step callable is provided via the 'call' of the node attribute dictionary.
It's value is `(callable, callable_keyword_arguments)`.
This 'callable' can be a python module dot path, callable function or even a class.

### Execute our graph with our chosen scheduler

For the execution of our graph, we will make use of the built-in generic [ExecuteGraph](docs/dagrunner.execute_graph.md#class-executegraph) class.
This class accepts graphs taking the form of a python dot module path to a networkx, a [networkx.DiGraph](https://networkx.org/documentation/stable/reference/classes/digraph.html) object or a tuple containing `(edges, nodes)`. In our simple example here, we will pass our edges and nodes above so that [ExecuteGraph](docs/dagrunner.execute_graph.md#class-executegraph) can construct our networkx graph for us.

Here we provide our edges and settings (nodes) and choose the 'single-threaded' scheduler.

```python
from dagrunner.execute_graph import ExecuteGraph
graph = ExecuteGraph((EDGES, SETTINGS), num_workers=None, scheduler="single-threaded", verbose=True)
```

Let's visualise our Networkx first - save a `png` image of the graph:
```python
import matplotlib.pyplot as plt
import networkx as nx
nx.draw(graph.nxgraph, with_labels=True)
plt.savefig("graph.png")
```
![image](https://github.com/MetOffice/dagrunner/assets/2071643/de103edd-16c9-487a-bf22-3d50d81c908c)

Now, finally, let's execute it:
```python
graph()
```
```
args: []
call: (<class '__main__.ProcessID'>, {'id': 1})
ProcessID(*[], **{'id': 1})
result: '1'
args: ['1']
call: (<class '__main__.ProcessID'>, {'id': 2})
ProcessID(*['1'], **{'id': 2})
result: '1_2'
args: []
call: (<class '__main__.ProcessID'>, {'id': 3})
ProcessID(*[], **{'id': 3})
result: '3'
args: ['3']
call: (<class '__main__.ProcessID'>, {'id': 4})
ProcessID(*['3'], **{'id': 4})
result: '3_4'
args: ['1_2', '3_4']
call: (<class '__main__.ProcessID'>, {'id': 5})
ProcessID(*['1_2', '3_4'], **{'id': 5})
result: '1_2_3_4_5'
args: []
call: (<class '__main__.ProcessID'>, {'id': 1})
ProcessID(*[], **{'id': 1})
result: '1'
args: ['1']
call: (<class '__main__.ProcessID'>, {'id': 2})
ProcessID(*['1'], **{'id': 2})
result: '1_2'
args: []
call: (<class '__main__.ProcessID'>, {'id': 3})
ProcessID(*[], **{'id': 3})
result: '3'
args: ['3']
call: (<class '__main__.ProcessID'>, {'id': 4})
ProcessID(*['3'], **{'id': 4})
result: '3_4'
args: ['1_2', '3_4']
call: (<class '__main__.ProcessID'>, {'id': 5})
ProcessID(*['1_2', '3_4'], **{'id': 5})
result: '1_2_3_4_5'
Run-time: 20.03338298993185s
```
We can see that the 'result' of the two execution branches (each leadtime), demonstrates the concatenation of node IDs.
That is, the concatenation of node ID strings passed between nodes in the execution graph.

## Customising graph generation and its execution

### Customise graph construction

Graph construction is owned by you or your project, that utilises **dagrunner**.
We saw how to execute our graph in our [example](#execute-our-graph-with-our-chosen-scheduler). This [ExecuteGraph](docs/dagrunner.execute_graph.md#class-executegraph) class provides a means to customise what graph we actually execute by providing the means to pass it a **callable** which returns a networkx graph.
As mentioned previously, this can be a python dot module path or the object itself.

Typical uses include delaying the construction of your networkx graph until it is actually executed. However, this offsers complete flexibility for you to customise graph construction to your individual projects needs.

Note that modifying graph construction is an added complication and should be considered only where it is deemed absolutely necessary beyond the simple usecase (lazy construction).

#### example lazy graph construction

Let's say we define our edges and settings in the above [example](#example-library-usage) in a module accessed by 'node-edge-module-dot-path', a python module dot path. In this case, containing `EDGES` and `SETTINGS` objects.
We can then define a callable which is responsible for generating a networkx graph for these (when called).

```python
from importlib import import_module
import networkx as nx

def filter_missing(node):
return {k: v for k, v in vars(node).items() if v is not None}

def gen_networkx(config_dot_path):
config_subpkg = import_module(f"{config_dot_path}")
graph = nx.DiGraph()
for module in config_subpkg.__all__:
print(f"config_dot_path: {config_dot_path}, module: {module}")
mod = import_module(f"{config_dot_path}.{module}")
edges, nodes = mod.EDGES, mod.SETTINGS
nodes = {k: filter_missing(k) | nodes[k] for k in nodes.keys()}.items()
graph.add_edges_from(edges)
graph.add_nodes_from(nodes)
return graph

GRAPH = lambda: gen_networkx("<node-edge-module-dot-path>")
```
We can now provide a python module dot path to this graph object to the `dagrunner-execute-graph` script. The networkx graph will then be constructed when DAGrunner internally calls it before its execution.

### Customise node execution

The [ExecuteGraph](docs/dagrunner.execute_graph.md#class-executegraph) class accepts a custom 'plugin_executor' (rather than by default to use the built-in [plugin-executor](https://github.com/MetOffice/dagrunner/blob/main/docs/dagrunner.execute_graph.md#function-plugin_executor)).

The 'plugin_executor' is what wraps every 'node' and is responsible for understanding how to 'execute' the particular node it wraps. For example, the built-in [plugin-executor](https://github.com/MetOffice/dagrunner/blob/main/docs/dagrunner.execute_graph.md#function-plugin_executor) defines the contract we utilise in our example graph above, where 'call' takes the form `(callable object or python dot path to callable, callable keyword arguments)`. For each node, this plugin executor then calls the underlying processing module (plugin) with its provided arguments (as per 'call').

#### an example extended 'plugin-executor'

```python
from dagrunner.execute_graph import ExecuteGraph, plugin_executor

def custom_plugin_executor(*args, call=None, verbose=False, dry_run=False, **kwargs):
# do something custom
return plugin_executor(*args, call=call, verbose=verbose, dry_run=dry_run, **kwargs)
```

Now, let's execute our graph with our customised execution function.
```python
ExecuteGraph(..., plugin_executor=custom_plugin_executor, ...)
```
Note that you may choose to subclass [ExecuteGraph](docs/dagrunner.execute_graph.md#class-executegraph) and or write a custom commandline script to call it, depending on your requirements.
See [docs/demo.ipynb](docs/demo.ipynb)

## Processing modules (aka plugins)

DAGrunner concerns itself with graph execution and does not strictly require processing modules (plugins) to take any particular form. That is, you may or may not choose to use or subclass the plugins provided by DAGrunner.
However, for convenience, DAGrunner does define some plugins which fall into two broad categories, as defined by two abstract classes. One is the basic [Plugin](docs/dagrunner.plugin_framework.md#class-plugin) which defines a reasonable standard UI. The other is [NodeAwarePlugin](docs/dagrunner.plugin_framework.md#class-nodeawareplugin). This is identical to the basic [Plugin](docs/dagrunner.plugin_framework.md#class-plugin) but additionally triggers the the built-in plugin-executor function to pass your plugin all of its node parameters (i.e. extend the keyword arguments with node properties in its call). That is, making the plugin we define 'node aware'.
However, for convenience, DAGrunner does define some plugins which fall into two broad categories, some abstract and some for use as they are.

Plugins included:
- [Plugin](docs/dagrunner.plugin_framework.md#class-plugin): Abstract class on which to define other plugins.
- [NodeAwarePlugin](docs/dagrunner.plugin_framework.md#class-nodeawareplugin): Abstract class on which to define 'node aware' plugins.
- [Shell(Plugin)](docs/dagrunner.plugin_framework.md#class-shell): Execute a subprocess command.
- [DataPolling(Plugin)](docs/dagrunner.plugin_framework.md#class-datapolling): Poll for availability of files.
- [Input(NodeAwarePlugin)](docs/dagrunner.plugin_framework.md#class-input): Given a filepath, expand it using keyword argument, environment variables and any node properties provided.
See [here](docs/dagrunner.plugin_framework.md) for more information.

## Schedulers

The `dagrunner-execute-graph` script exposes a scheduler argument for specifying our preferred scheduler. Schedulers include those provided by [dask](https://www.dask.org/), [ray](https://docs.ray.io/en/latest/ray-more-libs/dask-on-ray.html) as well as an in-house multiprocessing asynchronous scheduler (built upon the [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) library). See command help for further details.
The `dagrunner-execute-graph` script exposes a scheduler argument for specifying our preferred scheduler. DAGRunner provides a layer of abstraction for schedulers. This enables a range of schedulers to be selected as per requirement.

These range from [dask](https://www.dask.org/), [ray](https://docs.ray.io/en/latest/ray-more-libs/dask-on-ray.html) to our own in-house multiprocessing asynchronous scheduler (built upon the [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) library). See command help for further details.

## Logging and monitoring

Expand Down
14 changes: 13 additions & 1 deletion dagrunner/execute_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,19 @@ def __init__(
- `networkx_graph` (networkx.DiGraph, callable or str):
Python dot path to a `networkx.DiGraph` or tuple(edges, settings) object, or
callable that returns one. When called via the library, we support passing
the `networkx.DiGraph` or `tuple(edges, settings)` objects directly.
the `networkx.DiGraph` or `tuple(edges, settings)` objects directly. Note
that 'settings' represent a mapping (dictionary) between node and the node
attributes. When provided, DAGrunner will attempt to convert this tuple into
a networkx through the following pseudo-code:
1. Copy node identity properties into the node attributes dictionary
and remove any attributes that are 'None' ('settings' from the tuple
provided).
2. Construct an empty networkx.DiGraph object.
3. Add edges to this graph ('edges' from the tuple provided).
4. Add node to attributes lookup to this graph ('settings' from the tuple
provided).
It is recommended that the user instead provide the networkx graph directly
rather than relying on DAGRunner to decide how to construct it.
- `networkx_graph_kwargs` (dict):
Keyword arguments to pass to the `networkx_graph` when it represents a
callable. Optional.
Expand Down
12 changes: 6 additions & 6 deletions dagrunner/plugin_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ def __call__(
self, *args, timeout=60 * 2, polling=1, file_count=None, verbose=False
):
"""
Poll for availability of files
Poll for the availability of files
Poll for data and return when all are available or otherwise raise an
Poll for data and return when all data is available or otherwise raise an
exception if the timeout is reached.
Args:
Expand Down Expand Up @@ -268,11 +268,11 @@ def host_and_glob_key(path):
class Input(NodeAwarePlugin):
def __call__(self, filepath, node_properties=None, **kwargs):
"""
Given a filepath, expand it and return this string
Given a string, expand it and return this expanded string.
Expand the provided filepath using the keyword arguments and environment
variables. Note that this plugin is 'node aware' since it is derived from the
`NodeAwarePlugin`.
Expand the provided string (typically representing a filepath) using the
keyword arguments and environment variables. Note that this plugin is
'node aware' since it is derived from the `NodeAwarePlugin`.
Args:
- `filepath` (str): The filepath to be expanded.
Expand Down
Loading

0 comments on commit ebadf31

Please sign in to comment.