Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent node execution order by sorting node with Sequentialrunner #1604

Merged
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
* Reduced number of log lines by changing the logging level from `INFO` to `DEBUG` for low priority messages.
* Kedro's framework-side logging configuration no longer performs file-based logging. Hence superfluous `info.log`/`errors.log` files are no longer created in your project root, and running Kedro on read-only file systems such as Databricks Repos is now possible.
* The `root` logger is now set to the Python default level of `WARNING` rather than `INFO`. Kedro's logger is still set to emit `INFO` level messages.
* Kedro pipeline will have consistent execution order given the same set of nodes when using with `SequentialRunner`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Kedro pipeline will have consistent execution order given the same set of nodes when using with `SequentialRunner`.
* `SequentialRunner` now consistently runs nodes in the same order across multiple runs.

I still don't think this is a very clear explanation though. Maybe what you have is better 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @MerelTheisenQB has a better idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is any better to be honest 😅 "Added sorting of nodes for the SequentialRunner to facilitate consistent execution order across multiple runs. "

* `kedro jupyter notebook/lab` no longer reuses a Jupyter kernel.
* Required `cookiecutter>=2.1.1` to address a [known command injection vulnerability](https://security.snyk.io/vuln/SNYK-PYTHON-COOKIECUTTER-2414281).


## Upcoming deprecations for Kedro 0.19.0
* `kedro.extras.ColorHandler` will be removed in 0.19.0.

Expand Down
10 changes: 7 additions & 3 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def nodes(self) -> List[Node]:
return list(chain.from_iterable(self._topo_sorted_nodes))

@property
def grouped_nodes(self) -> List[Set[Node]]:
def grouped_nodes(self) -> List[List[Node]]:
"""Return a list of the pipeline nodes in topologically ordered groups,
i.e. if node A needs to be run before node B, it will appear in an
earlier group.
Expand Down Expand Up @@ -870,7 +870,7 @@ def _validate_transcoded_inputs_outputs(nodes: List[Node]) -> None:
)


def _topologically_sorted(node_dependencies) -> List[Set[Node]]:
def _topologically_sorted(node_dependencies) -> List[List[Node]]:
"""Topologically group and sort (order) nodes such that no node depends on
a node that appears in the same or a later group.

Expand All @@ -894,7 +894,11 @@ def _circle_error_message(error_data: Dict[str, str]) -> str:
return f"Circular dependencies exist among these items: {circular}"

try:
return list(toposort(node_dependencies))
result = []
for dependencies in toposort(node_dependencies):
# Sort it so it has consistent order when run with SequentialRunner
result.append(sorted(dependencies))
return result
noklam marked this conversation as resolved.
Show resolved Hide resolved
except ToposortCircleError as exc:
message = _circle_error_message(exc.data)
raise CircularDependencyError(message) from exc
Expand Down
54 changes: 52 additions & 2 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import random
import re
from itertools import chain
from typing import List

import pytest

import kedro
from kedro.pipeline import Pipeline, node
from kedro.pipeline.node import Node
from kedro.pipeline.pipeline import (
CircularDependencyError,
ConfirmNotUniqueError,
Expand Down Expand Up @@ -253,8 +256,9 @@ def test_grouped_nodes(self, input_data):
grouped = pipeline.grouped_nodes
# Flatten a list of grouped nodes
assert pipeline.nodes == list(chain.from_iterable(grouped))
# Check each grouped node matches with expected group
assert all(g == e for g, e in zip(grouped, expected))
# Check each grouped node matches with the expected group, the order is
# non-deterministic, so we are only checking they have the same set of nodes.
assert all(set(g) == e for g, e in zip(grouped, expected))

def test_free_input(self, input_data):
nodes = input_data["nodes"]
Expand Down Expand Up @@ -588,6 +592,52 @@ def test_connected_pipeline(self, disjoint_pipeline):
assert len(pipeline.inputs()) == 1
assert len(pipeline.outputs()) == 1

def test_pipeline_consistent_nodes_order(self, mocker):
"""
Pipeline that have multiple possible execution orders should have consistent
solutions
Possible Solutions:
1. A -> B -> C -> D -> E -> F
2. B -> A -> C -> D -> E -> F
3 ... Any permutation as long as F is executed last.

Although we are not sure which permutation it is, but it should always output
the same permutation.

A-- \
B--- \
C---- F
D--- /
E-- /
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice 👍


def multiconcat(*args):
return sum(args)
noklam marked this conversation as resolved.
Show resolved Hide resolved

mock_hash = mocker.patch(f"{__name__}.Node.__hash__")
expected_sorted_nodes: List[List[Node]] = None

# Repeat 10 times so we can be sure it is not purely by chance
for _ in range(10):
mock_hash.return_value = random.randint(1, 1e20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is doing what you want it to do. This is currently fixing the hash of every Node instance in this pipeline to be the same. We don't want hash(node1) to be the same as hash(node2). What we should have is:

        n1 = node(constant_output, None, "A")
        n2 = node(constant_output, None, "B")
        n3 = node(constant_output, None, "C")
        n4 = node(constant_output, None, "D")
        n5 = node(constant_output, None, "E")
        n6 = node(multiconcat, ["A", "B", "C", "D", "E"], "F")

       # You don't have to nest these, you can put them all in one with block
       # But actually for Python < 3.10 it's super ugly formatting still: https://stackoverflow.com/questions/3024925/create-a-with-block-on-several-context-managers
        for _ in range(10):
            with mock.patch.object(n1, "__hash__", random.randint(1, 1e20)):
                with mock.patch.object(n2, "__hash__", random.randint(1, 1e20)):
                    with mock.patch.object(n3, "__hash__", random.randint(1, 1e20)):
                        with mock.patch.object(n4, "__hash__", random.randint(1, 1e20)):
                            with mock.patch.object(n5, "__hash__", random.randint(1, 1e20)):
                                with mock.patch.object(n6, "__hash__", random.randint(1, 1e20)):
                                    inverted_fork_dags = Pipeline([n1, n2, n3, n4, n5, n6])
                                    # use inverted_fork_dags.nodes

... but this is still not a great test because the current code in main doesn't fail it 😬

After spending a looooong time playing around with this, I think it might just not be worth writing a test for it at all all... So long as it works as it should in manual testing then I think we're fine.

Happy to explain more about what I discovered while playing around with the testing here if you want to hear. It's certainly a tricky one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's chat tomorrow!


inverted_fork_dags = Pipeline(
[
node(constant_output, None, "A"),
node(constant_output, None, "B"),
node(constant_output, None, "C"),
node(constant_output, None, "D"),
node(constant_output, None, "E"),
node(multiconcat, ["A", "B", "C", "D", "E"], "F"),
]
)
if not expected_sorted_nodes:
expected_sorted_nodes = inverted_fork_dags.nodes

else:

assert expected_sorted_nodes == inverted_fork_dags.nodes


class TestPipelineDescribe:
def test_names_only(self, str_node_inputs_list):
Expand Down
2 changes: 1 addition & 1 deletion tests/pipeline/test_pipeline_with_transcoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def test_grouped_nodes(self, input_data):
# Flatten a list of grouped nodes
assert pipeline.nodes == list(chain.from_iterable(grouped))
# Check each grouped node matches with expected group
assert all(g == e for g, e in zip(grouped, expected))
assert all(set(g) == e for g, e in zip(grouped, expected))

def test_free_input(self, input_data):
nodes = input_data["nodes"]
Expand Down