From 6205d470f8b0adf6d3e6390209b71e2b80d5a526 Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Thu, 9 Jun 2022 14:17:39 +0100 Subject: [PATCH 1/9] Consistent node execution order by sorting node Signed-off-by: Nok Chan --- kedro/pipeline/pipeline.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index dff0b214d7..fb56bcc360 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -870,7 +870,7 @@ def _validate_transcoded_inputs_outputs(nodes: List[Node]) -> None: ) -def _topologically_sorted(node_dependencies) -> List[Set[Node]]: +def _topologically_sorted(node_dependencies) -> List[List[Node]]: """Topologically group and sort (order) nodes such that no node depends on a node that appears in the same or a later group. @@ -894,7 +894,11 @@ def _circle_error_message(error_data: Dict[str, str]) -> str: return f"Circular dependencies exist among these items: {circular}" try: - return list(toposort(node_dependencies)) + result = [] + for dependencies in toposort(node_dependencies): + # Make sure it is sorted to have consistent execution order when run with SequentialRunner + result.append(sorted(dependencies)) + return result except ToposortCircleError as exc: message = _circle_error_message(exc.data) raise CircularDependencyError(message) from exc From 5319d5c8c3427e7416c5b56cfb7d96a1348756a2 Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Tue, 14 Jun 2022 11:01:27 +0100 Subject: [PATCH 2/9] Adding the unittest Signed-off-by: Nok Chan --- tests/pipeline/test_pipeline.py | 50 +++++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 5b5cfbecd2..d095c82faa 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -12,8 +12,8 @@ _strip_transcoding, _transcode_split, ) - - +from kedro.pipeline.node import Node +import random class TestTranscodeHelpers: def test_split_no_transcode_part(self): assert _transcode_split("abc") == ("abc", "") @@ -588,6 +588,52 @@ def test_connected_pipeline(self, disjoint_pipeline): assert len(pipeline.inputs()) == 1 assert len(pipeline.outputs()) == 1 + def test_pipeline_consistent_nodes_order(self, mocker): + """ + Pipeline that have multiple possible execution orders should have consistent + solutions + Possible Solutions: + 1. A -> B -> C -> D -> E -> F + 2. B -> A -> C -> D -> E -> F + 3 ... Any permutation as long as F is executed last. + + Although we are not sure which permutation it is, but it should always output + the same permutation. + + A-- \ + B--- \ + C---- F + D--- / + E-- / + """ + def multiconcat(*args): + return sum(args) + + mock_hash = mocker.patch(f"{__name__}.Node.__hash__") + expected_sorted_nodes = None + + # Repeat 10 times so we can be sure it is not purely by chance + for i in range(1, 11): + mock_hash.return_value = random.randint(1, 1E20) + + inverted_fork_dags = Pipeline( + [ + node(constant_output, None, "A"), + node(constant_output, None, "B"), + node(constant_output, None, "C"), + node(constant_output, None, "D"), + node(constant_output, None, "E"), + node(multiconcat, ["A", "B", "C", "D", "E"], "F"), + ] + ) + if not expected_sorted_nodes: + expected_sorted_nodes = inverted_fork_dags.nodes + + else: + + assert expected_sorted_nodes == inverted_fork_dags.nodes + + class TestPipelineDescribe: def test_names_only(self, str_node_inputs_list): From b83b57c9b483f42165f54e34e449e4c0a92cbea7 Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Tue, 14 Jun 2022 11:51:03 +0100 Subject: [PATCH 3/9] fix test and linting Signed-off-by: Nok Chan --- kedro/pipeline/pipeline.py | 4 ++-- tests/pipeline/test_pipeline.py | 20 ++++++++++++-------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index fb56bcc360..637b6975dd 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -352,7 +352,7 @@ def nodes(self) -> List[Node]: return list(chain.from_iterable(self._topo_sorted_nodes)) @property - def grouped_nodes(self) -> List[Set[Node]]: + def grouped_nodes(self) -> List[List[Node]]: """Return a list of the pipeline nodes in topologically ordered groups, i.e. if node A needs to be run before node B, it will appear in an earlier group. @@ -896,7 +896,7 @@ def _circle_error_message(error_data: Dict[str, str]) -> str: try: result = [] for dependencies in toposort(node_dependencies): - # Make sure it is sorted to have consistent execution order when run with SequentialRunner + # Sort it so it has consistent order when run with SequentialRunner result.append(sorted(dependencies)) return result except ToposortCircleError as exc: diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index d095c82faa..2065bd706e 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1,10 +1,13 @@ +import random import re from itertools import chain +from typing import List import pytest import kedro from kedro.pipeline import Pipeline, node +from kedro.pipeline.node import Node from kedro.pipeline.pipeline import ( CircularDependencyError, ConfirmNotUniqueError, @@ -12,8 +15,8 @@ _strip_transcoding, _transcode_split, ) -from kedro.pipeline.node import Node -import random + + class TestTranscodeHelpers: def test_split_no_transcode_part(self): assert _transcode_split("abc") == ("abc", "") @@ -253,8 +256,9 @@ def test_grouped_nodes(self, input_data): grouped = pipeline.grouped_nodes # Flatten a list of grouped nodes assert pipeline.nodes == list(chain.from_iterable(grouped)) - # Check each grouped node matches with expected group - assert all(g == e for g, e in zip(grouped, expected)) + # Check each grouped node matches with the expected group, the order is + # non-deterministic, so we are only checking they have the same set of nodes. + assert all(set(g) == e for g, e in zip(grouped, expected)) def test_free_input(self, input_data): nodes = input_data["nodes"] @@ -606,15 +610,16 @@ def test_pipeline_consistent_nodes_order(self, mocker): D--- / E-- / """ + def multiconcat(*args): return sum(args) mock_hash = mocker.patch(f"{__name__}.Node.__hash__") - expected_sorted_nodes = None + expected_sorted_nodes: List[List[Node]] = None # Repeat 10 times so we can be sure it is not purely by chance - for i in range(1, 11): - mock_hash.return_value = random.randint(1, 1E20) + for _ in range(10): + mock_hash.return_value = random.randint(1, 1e20) inverted_fork_dags = Pipeline( [ @@ -634,7 +639,6 @@ def multiconcat(*args): assert expected_sorted_nodes == inverted_fork_dags.nodes - class TestPipelineDescribe: def test_names_only(self, str_node_inputs_list): pipeline = Pipeline(str_node_inputs_list["nodes"]) From 7769907e3848eabc6db531b6e9d59679f82acc9d Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Tue, 14 Jun 2022 14:09:00 +0100 Subject: [PATCH 4/9] Fix unittests Signed-off-by: Nok Chan --- tests/pipeline/test_pipeline_with_transcoding.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pipeline/test_pipeline_with_transcoding.py b/tests/pipeline/test_pipeline_with_transcoding.py index a2936a8e3e..ea4fcc2020 100644 --- a/tests/pipeline/test_pipeline_with_transcoding.py +++ b/tests/pipeline/test_pipeline_with_transcoding.py @@ -121,7 +121,7 @@ def test_grouped_nodes(self, input_data): # Flatten a list of grouped nodes assert pipeline.nodes == list(chain.from_iterable(grouped)) # Check each grouped node matches with expected group - assert all(g == e for g, e in zip(grouped, expected)) + assert all(set(g) == e for g, e in zip(grouped, expected)) def test_free_input(self, input_data): nodes = input_data["nodes"] From 0531ba26f873a1f8bc330f9475b060f085b472ed Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Tue, 14 Jun 2022 14:09:19 +0100 Subject: [PATCH 5/9] Update README Signed-off-by: Nok Chan --- RELEASE.md | 1 + 1 file changed, 1 insertion(+) diff --git a/RELEASE.md b/RELEASE.md index 1bf296a84f..19778810b6 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -25,6 +25,7 @@ * Reduced number of log lines by changing the logging level from `INFO` to `DEBUG` for low priority messages. * Kedro's framework-side logging configuration no longer performs file-based logging. Hence superfluous `info.log`/`errors.log` files are no longer created in your project root, and running Kedro on read-only file systems such as Databricks Repos is now possible. * The `root` logger is now set to the Python default level of `WARNING` rather than `INFO`. Kedro's logger is still set to emit `INFO` level messages. +* Kedro pipeline will have consistent execution order given the same set of nodes when using with `SequentialRunner`. ## Upcoming deprecations for Kedro 0.19.0 * `kedro.extras.ColorHandler` will be removed in 0.19.0. From 36527e2408877e7ae0bd05e052b491a1f54a48f3 Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Wed, 15 Jun 2022 23:32:05 +0100 Subject: [PATCH 6/9] remove test Signed-off-by: Nok Chan --- tests/pipeline/test_pipeline.py | 46 --------------------------------- 1 file changed, 46 deletions(-) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 2065bd706e..11ae531278 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -592,52 +592,6 @@ def test_connected_pipeline(self, disjoint_pipeline): assert len(pipeline.inputs()) == 1 assert len(pipeline.outputs()) == 1 - def test_pipeline_consistent_nodes_order(self, mocker): - """ - Pipeline that have multiple possible execution orders should have consistent - solutions - Possible Solutions: - 1. A -> B -> C -> D -> E -> F - 2. B -> A -> C -> D -> E -> F - 3 ... Any permutation as long as F is executed last. - - Although we are not sure which permutation it is, but it should always output - the same permutation. - - A-- \ - B--- \ - C---- F - D--- / - E-- / - """ - - def multiconcat(*args): - return sum(args) - - mock_hash = mocker.patch(f"{__name__}.Node.__hash__") - expected_sorted_nodes: List[List[Node]] = None - - # Repeat 10 times so we can be sure it is not purely by chance - for _ in range(10): - mock_hash.return_value = random.randint(1, 1e20) - - inverted_fork_dags = Pipeline( - [ - node(constant_output, None, "A"), - node(constant_output, None, "B"), - node(constant_output, None, "C"), - node(constant_output, None, "D"), - node(constant_output, None, "E"), - node(multiconcat, ["A", "B", "C", "D", "E"], "F"), - ] - ) - if not expected_sorted_nodes: - expected_sorted_nodes = inverted_fork_dags.nodes - - else: - - assert expected_sorted_nodes == inverted_fork_dags.nodes - class TestPipelineDescribe: def test_names_only(self, str_node_inputs_list): From 4cc5f85cc9ce0ee50c4f0518469659ff1d434c6b Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Thu, 16 Jun 2022 11:06:05 +0100 Subject: [PATCH 7/9] Minor refactoring with list comprehension Signed-off-by: Nok Chan --- kedro/pipeline/pipeline.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index 637b6975dd..029f7aa428 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -894,10 +894,8 @@ def _circle_error_message(error_data: Dict[str, str]) -> str: return f"Circular dependencies exist among these items: {circular}" try: - result = [] - for dependencies in toposort(node_dependencies): - # Sort it so it has consistent order when run with SequentialRunner - result.append(sorted(dependencies)) + # Sort it so it has consistent order when run with SequentialRunner + result = [sorted(dependencies) for dependencies in toposort(node_dependencies)] return result except ToposortCircleError as exc: message = _circle_error_message(exc.data) From 120b40daba3805a76942400b14ec988e0afab5ec Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Thu, 16 Jun 2022 11:17:19 +0100 Subject: [PATCH 8/9] Fix Linting Signed-off-by: Nok Chan --- tests/pipeline/test_pipeline.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 11ae531278..14a01a20a0 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1,13 +1,10 @@ -import random import re from itertools import chain -from typing import List import pytest import kedro from kedro.pipeline import Pipeline, node -from kedro.pipeline.node import Node from kedro.pipeline.pipeline import ( CircularDependencyError, ConfirmNotUniqueError, From 90126e6287fc5b69551e3be842aa3e2ee6f7de35 Mon Sep 17 00:00:00 2001 From: Nok Chan Date: Thu, 16 Jun 2022 12:13:50 +0100 Subject: [PATCH 9/9] minor update of release notes Signed-off-by: Nok Chan --- RELEASE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE.md b/RELEASE.md index 87dd9b0d4b..57ec4d926a 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -25,7 +25,7 @@ * Reduced number of log lines by changing the logging level from `INFO` to `DEBUG` for low priority messages. * Kedro's framework-side logging configuration no longer performs file-based logging. Hence superfluous `info.log`/`errors.log` files are no longer created in your project root, and running Kedro on read-only file systems such as Databricks Repos is now possible. * The `root` logger is now set to the Python default level of `WARNING` rather than `INFO`. Kedro's logger is still set to emit `INFO` level messages. -* Kedro pipeline will have consistent execution order given the same set of nodes when using with `SequentialRunner`. +* `SequentialRunner` now has consistent execution order across multiple runs with sorted nodes. * Bumped the upper bound for the Flake8 dependency to <5.0. * `kedro jupyter notebook/lab` no longer reuses a Jupyter kernel. * Required `cookiecutter>=2.1.1` to address a [known command injection vulnerability](https://security.snyk.io/vuln/SNYK-PYTHON-COOKIECUTTER-2414281).