Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Namespace Pipelines #1897

Merged
merged 50 commits into from
Jul 2, 2024
Merged

Refactor Namespace Pipelines #1897

merged 50 commits into from
Jul 2, 2024

Conversation

rashidakanchwala
Copy link
Contributor

@rashidakanchwala rashidakanchwala commented May 9, 2024

Description

Resolves #1899 , #1814

Development notes

To ease review process for - #1897 , created the below PRs

QA notes

Example modular pipeline tree:

"modular_pipelines": {
        "__root__": {
            "id": "__root__",
            "name": "__root__",
            "inputs": [],
            "outputs": [],
            "children": [
                {
                    "id": "feature_engineering",
                    "type": "modularPipeline"
                },
                {
                    "id": "b5609df0",
                    "type": "parameters"
                },
                {
                    "id": "f6d9538c",
                    "type": "data"
                },
            …
            ]
        },
        "feature_engineering": {
            "id": "feature_engineering",
            "name": "feature_engineering",
            "inputs": [
                "abed6a4d",
                "f063cc82",
            …
            ],
            "outputs": [
                "23c94afb",
                "1e3cc50a"
            …
            ],
            "children": [
                {
                    "id": "8e4f1015",
                    "type": "data"
                },
                {
                    "id": "04ba733a",
                    "type": "task"
                },
            …
            ]
        },
        …
    }

Current issues in constructing the modular pipeline tree:

  1. How we determine internal_inputs/outputs, external_inputs/outputs based on namespace and not on what kedro returns. Since datasets do not have a namespace (i.e., only kedro node and pipeline have namespaces) this raised issues in determining the actual inputs/outputs of a nested modular pipeline.
  2. Inheriting input/output datasets to parent modular pipeline when nested. This made few datasets to appear in the root modular pipeline even though they are not free output datasets.
  3. Readability/Maintenance issues in case of nested modular pipelines, as we did not define rules in adding a modular pipeline child, inputs and outputs for a modular pipeline
  4. On the UI, modular pipeline focus was missing associated inputs/outputs from getting highlighted in the node menu as dataset nodes do not have namespace, the associated modular_pipelines were always empty.

Incorrect rendering of nodes :

Issues raised by users -

How does this PR resolve the issues:

  1. Determines inputs/outputs to a modular pipeline based on what kedro returns.
  2. Removes the concept of internal/external inputs/outputs datasets for modular pipelines. There are only inputs/outputs for a modular pipeline. (Thanks to @idanov)
  3. Creates helper functions with rules, to deal with adding inputs/outputs and children to a modular pipeline.
  4. Populates modular pipeline tree before creating task/data nodes, which eliminates the need to calculate modular pipelines while creating the nodes using namespaces

Core parts that changed:

  1. Added helper methods populate_tree, add_children, _add_datasets_as_children, _add_children_to_parent_pipeline to ModularPipelinesRepository. (Thanks to @rashidakanchwala)
  2. While adding each KedroPipeline to Kedro-Viz data repositories, DataAccessManager calls populate_tree to resolve the construction of modular_pipelines_tree for the registered pipeline
  3. Inputs/Outputs for a modular pipeline are calculated using public apis available via Kedro (inputs(), outputs(), all_outputs(), only_nodes_with_namespace())
  4. Calculating children now have set of rules defined in the docstrings of add_children and other helper functions

Code Flow doc:

Please find further information at Refactor_Modular_Pipelines.docx

Modular Pipelines UI Rendering:

UseCase 1: When a modular pipeline output (dataset_3) is used as an input to another function of the same modular pipeline.

def create_pipeline(**kwargs) -> Pipeline:
    new_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_in",
                 outputs="dataset_1",
                 name="step1"),
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_2",
                 name="step2"),
            node(lambda x: x,
                 inputs="dataset_2",
                 outputs="dataset_3",
                 name="step3"),
            node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_out",
                 name="step4"
            )
        ],
            namespace="main_pipeline",
        inputs=None,
        outputs={"dataset_out", "dataset_3"}
    )
    return new_pipeline

Before:

image

After:

image

UseCase 2: When a nested modular pipeline output (dataset_3) is used as an input to the outer modular pipeline

def create_pipeline(**kwargs) -> Pipeline:

    sub_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_2",
                 name="step2"),
            node(lambda x: x,
                 inputs="dataset_2",
                 outputs="dataset_3",
                 name="step3"),
        ],
        inputs={"dataset_1"},
        outputs={"dataset_3"},
        namespace="sub_pipeline"
    )
    new_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_in",
                 outputs="dataset_1",
                 name="step1"),
            sub_pipeline,
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_1_2",
                 name="step1_2"),
            node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_4",
                 name="step4"
            )
        ],
            namespace="main_pipeline",
        inputs=None,
        outputs={"dataset_3","dataset_4"}
    )
    return new_pipeline

Before:

image

After:

image


UseCase 3: When a nested modular pipeline output (dataset_3) is used as an input to the outer modular pipeline and also used as an input to another external modular pipeline

def create_pipeline(**kwargs) -> Pipeline:

    sub_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_2",
                 name="step2"),
            node(lambda x: x,
                 inputs="dataset_2",
                 outputs="dataset_3",
                 name="step3"),
        ],
        inputs={"dataset_1"},
        outputs={"dataset_3"},
        namespace="sub_pipeline"
    )
    new_pipeline = pipeline(
        [
            node(lambda x: x,
                 inputs="dataset_in",
                 outputs="dataset_1",
                 name="step1"),
            sub_pipeline,
            node(lambda x: x,
                 inputs="dataset_1",
                 outputs="dataset_1_2",
                 name="step1_2"),
            node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_4",
                 name="step4"
            )
        ],
            namespace="main_pipeline",
        inputs=None,
        outputs={"dataset_3","dataset_4"}
    )

    other = pipeline([
        node(lambda x: x,
                 inputs="dataset_3",
                 outputs="dataset_5",
                 name="step5"
            )
    ],
    namespace="other_pipeline",
    inputs={"dataset_3"},
    outputs={"dataset_5"}
    )

    return new_pipeline + other

Before:

image

After:

image

UseCase 4: When an output of a namespace function (using node namespaces) (dataset_7, dataset_9) is an input to another function in the same namespace

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_1", "dataset_2"],
                outputs="dataset_3",
                name="first_node",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_3", "dataset_4"],
                outputs="dataset_5",
                name="second_node",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_5", "dataset_6"],
                outputs="dataset_7", 
                name="third_node",
                namespace="namespace_prefix_1",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_7", "dataset_8"],
                outputs="dataset_9",
                name="fourth_node",
                namespace="namespace_prefix_1",
            ),
            node(
                func=lambda dataset_1, dataset_2: (dataset_1, dataset_2),
                inputs=["dataset_9", "dataset_10"],
                outputs="dataset_11",
                name="fifth_node",
                namespace="namespace_prefix_1",
            ),
        ]
    )

Before:

image

After:

image

UseCase 5: When an output of a nested modular pipeline (model_inputs) is an input to another nested modular pipeline

def create_pipeline(**kwargs) -> Pipeline:
    data_processing_pipeline = pipeline(
        [
            node(
                lambda x: x,
                inputs=["raw_data"],
                outputs="model_inputs",
                name="process_data",
                tags=["split"],
            )
        ],
        namespace="uk.data_processing",
        outputs="model_inputs",
    )
    data_science_pipeline = pipeline(
        [
            node(
                lambda x: x,
                inputs=["model_inputs"],
                outputs="model",
                name="train_model",
                tags=["train"],
            )
        ],
        namespace="uk.data_science",
        inputs="model_inputs",
    )
    return data_processing_pipeline + data_science_pipeline

Before:

image

After:

image

UseCase 6: Nested namespace pipelines with single input (input_to_processing) and single output (output_from_processing)

def _get_generic_pipe() -> Pipeline:
    return Pipeline([
        node(
            func=lambda x: x,
            inputs="input_df",
            outputs="output_df",
        ),
    ])


def create_pipeline(**kwargs) -> Pipeline:
    pipe = Pipeline([
        pipeline(
            pipe=_get_generic_pipe(),
            inputs={"input_df": "input_to_processing"},
            outputs={"output_df": "post_first_pipe"},
            namespace="first_processing_step",
        ),
        pipeline(
            pipe=_get_generic_pipe(),
            inputs={"input_df": "post_first_pipe"},
            outputs={"output_df": "output_from_processing"},
            namespace="second_processing_step",
        ),
    ])
    return pipeline(
        pipe=pipe,
        inputs="input_to_processing",
        outputs="output_from_processing",
        namespace="processing",
    )

Before:

image

After:

image

Modular Pipelines expand and collapse in action:

Before:

UseCase 1-4:

UseCase1-4

UseCase 5-6:

UseCase5-6

After:

UseCase 1-4:

UseCase1-4_after

UseCase 5-6:

UseCase5-6_after

Checklist

  • Read the contributing guidelines
  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added new entries to the RELEASE.md file
  • Added tests to cover my changes

ravi-kumar-pilla and others added 17 commits April 24, 2024 19:27
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
ravi-kumar-pilla and others added 12 commits June 11, 2024 09:14
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
…aft/refactor-mod-pipe

Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
@rashidakanchwala rashidakanchwala changed the title Draft/refactor mod pipe Refactor Namespace Pipelines Jul 2, 2024
@rashidakanchwala rashidakanchwala marked this pull request as ready for review July 2, 2024 12:52
@rashidakanchwala rashidakanchwala requested a review from jitu5 as a code owner July 2, 2024 12:52
@rashidakanchwala rashidakanchwala requested a review from merelcht July 2, 2024 12:59
Copy link
Member

@merelcht merelcht left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Giving final approval after having reviewed and approved all the sub PRs.

@ravi-kumar-pilla ravi-kumar-pilla self-requested a review July 2, 2024 13:33
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Copy link
Contributor

@ravi-kumar-pilla ravi-kumar-pilla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍 ... adding release note on the refactor and the bug fix should be great. Thank you @rashidakanchwala

@rashidakanchwala rashidakanchwala merged commit 35d351f into main Jul 2, 2024
40 checks passed
@rashidakanchwala rashidakanchwala deleted the draft/refactor-mod-pipe branch July 2, 2024 14:17
@SajidAlamQB SajidAlamQB mentioned this pull request Jul 25, 2024
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor Modular Pipelines in the Kedro-viz backend
3 participants