Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Namespace Pipelines #1897

Merged
merged 50 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
0e7f24d
merge main from remote
ravi-kumar-pilla Apr 25, 2024
c1aae75
Merge branch 'main' of https://github.com/kedro-org/kedro-viz
ravi-kumar-pilla Apr 26, 2024
177ccbc
merging remote
ravi-kumar-pilla May 1, 2024
8ecf9bf
Merge branch 'main' of https://github.com/kedro-org/kedro-viz
ravi-kumar-pilla May 2, 2024
9e3040e
testing in progress
ravi-kumar-pilla May 3, 2024
54a4574
wip
ravi-kumar-pilla May 5, 2024
5b8235e
tested namespace pipelines
ravi-kumar-pilla May 6, 2024
8174222
update comments
ravi-kumar-pilla May 6, 2024
76be4da
update function comments
ravi-kumar-pilla May 6, 2024
6faee38
update pytests
ravi-kumar-pilla May 6, 2024
69d0c9c
update pytests
ravi-kumar-pilla May 7, 2024
3383da0
test branch
rashidakanchwala May 7, 2024
5e7a891
WIP
rashidakanchwala May 8, 2024
d2188bc
wip
rashidakanchwala May 8, 2024
fac44e0
good prog
rashidakanchwala May 8, 2024
b253629
still WIP
rashidakanchwala May 10, 2024
4ad7acc
in working condition
rashidakanchwala May 13, 2024
47c044e
push in a good state
rashidakanchwala May 14, 2024
cce8e96
finally
rashidakanchwala May 14, 2024
ad67a8d
update pipeline_registry
rashidakanchwala May 14, 2024
09efff4
fix params
rashidakanchwala May 14, 2024
ea7750e
Merge branch 'main' into draft/refactor-mod-pipe
rashidakanchwala May 14, 2024
58cc320
fixed deep nesting
rashidakanchwala May 16, 2024
78b66aa
remove unwanted prints
rashidakanchwala May 16, 2024
9fa5da5
update pipeline_registry"
rashidakanchwala May 20, 2024
ae2bafa
Merge branch 'main' into draft/refactor-mod-pipe
rashidakanchwala May 21, 2024
c84e5c6
add ignored datasets to root modular pipeline
ravi-kumar-pilla Jun 1, 2024
f292f22
refactor to specific type
ravi-kumar-pilla Jun 3, 2024
bebc452
merge main
ravi-kumar-pilla Jun 3, 2024
536468f
partially working tests
ravi-kumar-pilla Jun 6, 2024
a2aca7b
update permissions
ravi-kumar-pilla Jun 6, 2024
73394e0
adjust pytest for responses
ravi-kumar-pilla Jun 6, 2024
373d033
working stage with pytests, lint, format - need minor improvements to…
ravi-kumar-pilla Jun 7, 2024
950a829
pytests updated and fixed lint
ravi-kumar-pilla Jun 8, 2024
299fabe
fix js test
ravi-kumar-pilla Jun 10, 2024
c615e88
revert run command change
ravi-kumar-pilla Jun 11, 2024
bf827b5
revert namespace model change
ravi-kumar-pilla Jun 11, 2024
5ec81da
add comments and test
ravi-kumar-pilla Jun 11, 2024
9f0130f
update comment
ravi-kumar-pilla Jun 11, 2024
53752be
address PR comments1
ravi-kumar-pilla Jun 19, 2024
32c9f21
Merge branch 'main' of https://github.com/kedro-org/kedro-viz into dr…
ravi-kumar-pilla Jun 22, 2024
15bda80
add meaningful test names
ravi-kumar-pilla Jun 22, 2024
d4371a9
refactor tests
ravi-kumar-pilla Jun 24, 2024
1d87476
fix based on Ivan's comments
rashidakanchwala Jun 25, 2024
9326383
address PR comments2
ravi-kumar-pilla Jun 28, 2024
359d59b
improve pytest readability
ravi-kumar-pilla Jul 2, 2024
eaae032
added unit tests to mod-pipeline-repo
rashidakanchwala Jul 2, 2024
29f9835
adjust file permissions
ravi-kumar-pilla Jul 2, 2024
d355913
Merge branch 'main' into draft/refactor-mod-pipe
ravi-kumar-pilla Jul 2, 2024
f0978fa
add release notes
rashidakanchwala Jul 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions cypress/tests/ui/flowchart/menu.cy.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,18 @@ describe('Flowchart Menu', () => {
).click();

// Assert after action
cy.__checkForText__(
'.pipeline-node--active > .pipeline-node__text',
prettifyName(nodeToFocusText)
cy.get('.pipeline-node--active > .pipeline-node__text')
.invoke('text')
.then((focusedNodesText) =>
expect(focusedNodesText.toLowerCase()).to.contains(
prettifyName(nodeToFocusText).toLowerCase()
)
);
cy.get('.pipeline-node--active > .pipeline-node__text').should(
'have.length',
5
);

cy.get('.pipeline-node').should('have.length', 5);
});

Expand Down
3 changes: 1 addition & 2 deletions demo-project/src/demo_project/pipeline_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from demo_project.pipelines import modelling as mod
from demo_project.pipelines import reporting as rep


def register_pipelines() -> Dict[str, Pipeline]:
"""Register the project's pipelines.

Expand All @@ -25,7 +24,7 @@ def register_pipelines() -> Dict[str, Pipeline]:
)

reporting_pipeline = rep.create_pipeline()

return {
"__default__": (
ingestion_pipeline
Expand Down
123 changes: 91 additions & 32 deletions package/kedro_viz/data_access/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
TaskNode,
TranscodedDataNode,
)
from kedro_viz.services import layers_services, modular_pipelines_services
from kedro_viz.utils import _strip_transcoding
from kedro_viz.services import layers_services
from kedro_viz.utils import _strip_transcoding, is_dataset_param

from .repositories import (
CatalogRepository,
Expand Down Expand Up @@ -149,15 +149,21 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline):
pipeline: The Kedro pipeline instance to convert to graph models
and add to relevant repositories representing the graph.
"""
modular_pipelines = self.modular_pipelines[registered_pipeline_id]
self.registered_pipelines.add_pipeline(registered_pipeline_id)
modular_pipelines_repo_obj = self.modular_pipelines[registered_pipeline_id]
modular_pipelines_repo_obj.populate_tree(pipeline)

free_inputs = pipeline.inputs()

for node in pipeline.nodes:
task_node = self.add_node(registered_pipeline_id, node)
self.registered_pipelines.add_node(registered_pipeline_id, task_node.id)
# Add a Kedro node as a TaskNode to the NodesRepository
# for a given registered pipeline ID
task_node = self.add_node(
registered_pipeline_id, node, modular_pipelines_repo_obj
)

current_modular_pipeline_id = modular_pipelines.extract_from_node(task_node)
# Add the task node created above to RegisteredPipelinesRepository
self.registered_pipelines.add_node(registered_pipeline_id, task_node.id)

# Add node's inputs as DataNode to the graph
for input_ in node.inputs:
Expand All @@ -166,26 +172,26 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline):
# because free inputs to the pipeline can't be transcoded.
is_free_input = input_ in free_inputs
input_node = self.add_node_input(
registered_pipeline_id, input_, task_node, is_free_input
registered_pipeline_id,
input_,
task_node,
modular_pipelines_repo_obj,
is_free_input,
)
self.registered_pipelines.add_node(
registered_pipeline_id, input_node.id
)
if isinstance(input_node, TranscodedDataNode):
input_node.transcoded_versions.add(self.catalog.get_dataset(input_))

# Add the input as an input of the task_node's modular_pipeline, if any.
# The method `add_input` will take care of figuring out whether
# it is an internal or external input of the modular pipeline.
modular_pipelines.extract_from_node(input_node)
if current_modular_pipeline_id is not None:
modular_pipelines.add_input(current_modular_pipeline_id, input_node)

# Add node outputs as DataNode to the graph.
# It follows similar logic to adding inputs.
for output in node.outputs:
output_node = self.add_node_output(
registered_pipeline_id, output, task_node
registered_pipeline_id,
output,
task_node,
modular_pipelines_repo_obj,
)
self.registered_pipelines.add_node(
registered_pipeline_id, output_node.id
Expand All @@ -194,23 +200,30 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline):
output_node.original_name = output
output_node.original_version = self.catalog.get_dataset(output)

modular_pipelines.extract_from_node(output_node)
if current_modular_pipeline_id is not None:
modular_pipelines.add_output(
current_modular_pipeline_id, output_node
)

def add_node(self, registered_pipeline_id: str, node: KedroNode) -> TaskNode:
def add_node(
self,
registered_pipeline_id: str,
node: KedroNode,
modular_pipelines_repo_obj: ModularPipelinesRepository,
) -> TaskNode:
"""Add a Kedro node as a TaskNode to the NodesRepository
for a given registered pipeline ID.

Args:
registered_pipeline_id: The registered pipeline ID to which the node belongs.
node: The Kedro node to add as TaskNode.
modular_pipelines_repo_obj: The modular pipelines repository
instance for the registered pipeline.
Returns:
The GraphNode instance representing the Kedro node that was added to the graph.
"""
task_node: TaskNode = self.nodes.add_node(GraphNode.create_task_node(node))
(
node_id,
modular_pipeline_ids,
) = modular_pipelines_repo_obj.get_node_and_modular_pipeline_mapping(node)
task_node: TaskNode = self.nodes.add_node(
GraphNode.create_task_node(node, node_id, modular_pipeline_ids)
)
task_node.add_pipeline(registered_pipeline_id)
self.tags.add_tags(task_node.tags)
return task_node
Expand All @@ -220,6 +233,7 @@ def add_node_input(
registered_pipeline_id: str,
input_dataset: str,
task_node: TaskNode,
modular_pipelines_repo_obj: ModularPipelinesRepository,
is_free_input: bool = False,
) -> Union[DataNode, TranscodedDataNode, ParametersNode]:
"""Add a Kedro node's input as a DataNode, TranscodedDataNode or ParametersNode
Expand All @@ -229,13 +243,18 @@ def add_node_input(
registered_pipeline_id: The pipeline ID to which the node's input belongs.
input_dataset: The input dataset of the TaskNode.
task_node: The TaskNode to add input to.
modular_pipelines_repo_obj: The modular pipelines repository
instance for the registered pipeline.
is_free_input: Whether the input is a free input to the pipeline.
Returns:
The GraphNode instance representing the node's input that was added to the graph.
"""

graph_node = self.add_dataset(
registered_pipeline_id, input_dataset, is_free_input=is_free_input
registered_pipeline_id,
input_dataset,
modular_pipelines_repo_obj,
is_free_input=is_free_input,
)
graph_node.tags.update(task_node.tags)
self.edges[registered_pipeline_id].add_edge(
Expand All @@ -250,7 +269,11 @@ def add_node_input(
return graph_node

def add_node_output(
self, registered_pipeline_id: str, output_dataset: str, task_node: TaskNode
self,
registered_pipeline_id: str,
output_dataset: str,
task_node: TaskNode,
modular_pipelines_repo_obj: ModularPipelinesRepository,
) -> Union[DataNode, TranscodedDataNode, ParametersNode]:
"""Add a Kedro node's output as a DataNode, TranscodedDataNode or ParametersNode
to the NodesRepository for a given registered pipeline ID.
Expand All @@ -259,10 +282,14 @@ def add_node_output(
registered_pipeline_id: The pipeline ID to which the node's output belongs.
output_dataset: The output dataset of the TaskNode.
task_node: The TaskNode to add output to.
modular_pipelines_repo_obj: The modular pipelines repository
instance for the registered pipeline.
Returns:
The GraphNode instance representing the node's output that was added to the graph.
"""
graph_node = self.add_dataset(registered_pipeline_id, output_dataset)
graph_node = self.add_dataset(
registered_pipeline_id, output_dataset, modular_pipelines_repo_obj
)
graph_node.tags.update(task_node.tags)
self.edges[registered_pipeline_id].add_edge(
GraphEdge(source=task_node.id, target=graph_node.id)
Expand All @@ -274,6 +301,7 @@ def add_dataset(
self,
registered_pipeline_id: str,
dataset_name: str,
modular_pipelines_repo_obj: ModularPipelinesRepository,
is_free_input: bool = False,
) -> Union[DataNode, TranscodedDataNode, ParametersNode]:
"""Add a Kedro dataset as a DataNode, TranscodedDataNode or ParametersNode
Expand All @@ -282,27 +310,58 @@ def add_dataset(
Args:
registered_pipeline_id: The registered pipeline ID to which the dataset belongs.
dataset_name: The name of the dataset.
modular_pipelines_repo_obj: The modular pipelines repository
instance for the registered pipeline.
is_free_input: Whether the dataset is a free input to the registered pipeline.
Returns:
The GraphNode instance representing the dataset that was added to the NodesRepository.
"""
obj = self.catalog.get_dataset(dataset_name)
layer = self.catalog.get_layer_for_dataset(dataset_name)
graph_node: Union[DataNode, TranscodedDataNode, ParametersNode]
if self.catalog.is_dataset_param(dataset_name):
(
dataset_id,
modular_pipeline_ids,
) = modular_pipelines_repo_obj.get_node_and_modular_pipeline_mapping(
dataset_name
)

# add datasets that are not part of a modular pipeline
# as a child to the root modular pipeline
if modular_pipeline_ids is None:
root_modular_pipeline_node = (
modular_pipelines_repo_obj.get_or_create_modular_pipeline(
ROOT_MODULAR_PIPELINE_ID
)
)
root_modular_pipeline_node.children.add(
ModularPipelineChild(id=dataset_id, type=GraphNodeType.DATA)
)

# update the node_mod_pipeline_map
if dataset_id not in modular_pipelines_repo_obj.node_mod_pipeline_map:
modular_pipelines_repo_obj.node_mod_pipeline_map[dataset_id] = {
ROOT_MODULAR_PIPELINE_ID
}

if is_dataset_param(dataset_name):
graph_node = GraphNode.create_parameters_node(
dataset_id=dataset_id,
dataset_name=dataset_name,
layer=layer,
tags=set(),
parameters=obj,
modular_pipelines=None,
)
else:
graph_node = GraphNode.create_data_node(
dataset_id=dataset_id,
dataset_name=dataset_name,
layer=layer,
tags=set(),
dataset=obj,
stats=self.get_stats_for_data_node(_strip_transcoding(dataset_name)),
modular_pipelines=modular_pipeline_ids,
is_free_input=is_free_input,
)
graph_node = self.nodes.add_node(graph_node)
Expand Down Expand Up @@ -415,10 +474,10 @@ def create_modular_pipelines_tree_for_registered_pipeline(

edges = self.edges[registered_pipeline_id]
node_dependencies = self.node_dependencies[registered_pipeline_id]
modular_pipelines = self.modular_pipelines[registered_pipeline_id]
modular_pipelines_tree = modular_pipelines_services.expand_tree(
modular_pipelines.as_dict()
)
modular_pipelines_tree = self.modular_pipelines[
registered_pipeline_id
].as_dict()

root_parameters = set()

# turn all modular pipelines in the tree into a graph node for visualisation,
Expand Down Expand Up @@ -498,7 +557,7 @@ def create_modular_pipelines_tree_for_registered_pipeline(
or not node.belongs_to_pipeline(registered_pipeline_id)
):
continue
if not node.modular_pipelines or node_id in root_parameters:
if node.modular_pipelines is None or node_id in root_parameters:
modular_pipelines_tree[ROOT_MODULAR_PIPELINE_ID].children.add(
ModularPipelineChild(
id=node_id, type=self.nodes.get_node_by_id(node_id).type
Expand Down
7 changes: 0 additions & 7 deletions package/kedro_viz/data_access/repositories/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,3 @@ def as_dict(self) -> Dict[str, Optional["AbstractDataset"]]:
for dataset_name in self._catalog.list()
if self.get_dataset(dataset_name) is not None
}

@staticmethod
def is_dataset_param(dataset_name: str) -> bool:
"""Return whether a dataset is a parameter"""
return (
dataset_name.lower().startswith("params:") or dataset_name == "parameters"
)
Loading
Loading