Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor data access repositories for modular pipeline change #1938

Conversation

ravi-kumar-pilla
Copy link
Contributor

@ravi-kumar-pilla ravi-kumar-pilla commented Jun 10, 2024

Description

Partially resolves #1899

Development notes

  • Create a dictionary of node and modular pipeline mapping for faster search
  • Add parameters set to keep track of parameters (As parameter nodes do not have modular pipelines)
  • Add populate_tree method which takes care of inputs/outputs and children for all the modular pipelines within a registered pipeline
  • Add explode_namespace method which takes care of expanding a nested namespace.
  • Modify add_inputs and add_outputs methods to take a Set[str] of inputs and outputs that are calculated in populate_tree method
  • Add add_children method which resolves the modular pipeline child nodes
  • Add _add_children_to_parent_pipeline method which resolves adding children to parent modular pipeline in-case of nesting
  • Add _add_datasets_as_children method which adds datasets as children to the modular pipeline
  • Update tests

QA notes

  • This PR is part of a bigger refactor (Refactor Namespace Pipelines  #1897) of modular pipelines and is created to ease review process.
  • The CI build might fail as this PR is not self-sufficient

Checklist

  • Read the contributing guidelines
  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added new entries to the RELEASE.md file
  • Added tests to cover my changes

Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
@ravi-kumar-pilla ravi-kumar-pilla mentioned this pull request Jun 10, 2024
9 tasks
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
Copy link
Member

@merelcht merelcht left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done an initial review of this part, but want to have another look after reviewing the other parts. I've left comments mostly related to readability of the code.

`input_node.modular_pipelines`.
def add_inputs(self, modular_pipeline_id: str, inputs: Set[str]) -> None:
"""
Add input datasets to the modular pipeline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the code there's also a special case for parameters. I think it makes sense to mention that in the docstring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there is a special case for parameters and that is while adding the datasets as children. That is in the docstring for add_children, _add_children_to_parent_pipeline and _add_datasets_as_children. The method here is to add_inputs to the modular pipeline (like the free_inputs hanging outside the modular pipeline)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it's a good idea to describe the following code in the docstring:

 if is_dataset_param(_input):
                self.parameters.add(hashed_input)

self.tree[modular_pipeline_id].internal_outputs.add(output_node.id)
else:
self.tree[modular_pipeline_id].external_outputs.add(output_node.id)
def _hash_input_output(self, item: str) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to take self? As far as I understand this method just hashes input/output so it doesn't need an instance of ModularPipelinesRepository

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I would like to know where should the helper functions like this be placed in ? I used to place all the helper functions in utils as public. But there should be some rules which I am not aware of. Could you please let me know ? Thanks

Placed in -

  1. A class as static method (public/private)
  2. Outside of a class but in same file (public/private)
  3. Inside utils (public/private)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stackoverflow thread, specifically the first two answers, describe quite well what the general rule of thumb is when it comes to static methods, class methods and module functions. https://stackoverflow.com/questions/11788195/module-function-vs-staticmethod-vs-classmethod-vs-no-decorators-which-idiom-is

In the Kedro codebase we generally apply the following:

  1. Inside a class as static method: any method that is only used by that class, but the difference with a module function is that these types of methods are specific to this class and don't have any meaning outside of it. (e.g. https://github.com/kedro-org/kedro/blob/main/kedro/io/data_catalog.py#L336-L338)
  2. Outside of a class but in same file: any helper method that is only used in that specific file should be a module function at the top of the file. Note that these methods are only used in that file/class, but don't require any "knowledge" about the class: no need to access a class instance and the method has meaning outside of the context of the class (e.g. https://github.com/kedro-org/kedro/blob/main/kedro/framework/session/session.py#L33-L56)
  3. Inside utils: when it's a helper method used in multiple files and not specific to a certain class.

When it comes to making things public/private it's a bit different for Viz vs. Kedro but in the framework anything that's public is part of the public API. So it can (and will) be used by outside users directly accessing Kedro as a library. So that means, unless a user should have access to the method, we make it private. My personal opinion is that for Viz we should make things public when it's accessed in multiple classes/files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation Merel 💯

self.add_outputs(modular_pipeline_id, free_outputs)
self.add_children(modular_pipeline_id, sub_pipeline.nodes)

def _explode_namespace(self, nested_namespace: str) -> List[str]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to take self? This doesn't need an instance of ModularPipelinesRepository so it can just be a static method or it can be defined outside the ModularPipelinesRepository class altogether.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, converted to static method. Thank you

Signed-off-by: ravi-kumar-pilla <ravi_kumar_pilla@mckinsey.com>
@ravi-kumar-pilla ravi-kumar-pilla marked this pull request as ready for review June 24, 2024 15:01
kedro_node: Node,
modular_pipeline_inputs_outputs: Set[str],
):
"""Helper to add datasets (not parameters) related to task nodes as children.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking we could add datasets that have no modular pipeline ID associated with them (and thus belong to the root modular pipeline) in this function instead of in managers.py.

Copy link
Member

@idanov idanov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a correctness point of view, this looks good as it is as far as I can tell. There's a few things that can be done to make the style of the code a bit more readable. I've added some comments here and there with pointers. Happy to discuss on a call if you have questions.

Comment on lines 76 to 86
sub_pipeline = pipeline.only_nodes_with_namespace(modular_pipeline_id)
rest_of_the_pipeline = pipeline - sub_pipeline

free_inputs = sub_pipeline.inputs()
free_outputs = sub_pipeline.outputs() | (
rest_of_the_pipeline.inputs() & sub_pipeline.all_outputs()
)

self._add_inputs(modular_pipeline_id, free_inputs)
self._add_outputs(modular_pipeline_id, free_outputs)
self._add_children(modular_pipeline_id, sub_pipeline.nodes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth explaining the code here with a couple of comments, so we don't reintroduce the bugs we had before by accidentally modifying it in the future. In short, the logic is as follows:

  • We extract all nodes within a certain namespace as a subpipeline
  • We consider all free inputs to be the inputs of the subpipeline
  • We consider all free outputs to be the outputs of the subpipeline
  • We add as outputs all intermediary outputs which are consumed by external to the subpipeline nodes

Probably could be explained in less and shorter sentences.

Comment on lines 20 to 30
def _hash(value: str):
return hashlib.sha1(value.encode("UTF-8")).hexdigest()[:8]


def _hash_input_output(item: str) -> str:
"""Hash the input/output dataset."""
return (
_hash(_strip_transcoding(item))
if TRANSCODING_SEPARATOR in item
else _hash(item)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put these in utils and import them here, rather than the transcoding logic. Hide as much as possible behind a smaller API footprint.

self.tree[modular_pipeline_id].internal_outputs.add(output_node.id)
else:
self.tree[modular_pipeline_id].external_outputs.add(output_node.id)
def _add_children(self, modular_pipeline_id: str, kedro_nodes: List[Node]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole function is overly complicated, very wide open if condition and too much nestedness. Could you try to refactor it into a more concise and easier to follow one?

Comment on lines 192 to 198
for kedro_node in kedro_nodes:
# add kedro node as a child to the modular pipeline
if kedro_node.namespace == modular_pipeline_id:
kedro_node_id = _hash(str(kedro_node))
modular_pipeline.children.add(
ModularPipelineChild(id=kedro_node_id, type=GraphNodeType.TASK)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of nesting this with 2 extra levels, you could do something like:

children = [n for n in kedro_nodes if n.namespace == modular_pipeline_id]
modular_pipeline.children = [ModularPipelineChild(id=_hash(str(n)), type=GraphNodeType.TASK) for n in children]

Much more readable and explainable:

  1. We get all the nodes matching exactly the pipeline namespace
  2. Add them as children to the pipeline (this one could be .extend instead of = if there's other places that add to children called before this one)

Two actions, two lines of code...

Comment on lines 213 to 235
# The line `parent_modular_pipeline_id = ('.'.join(modular_pipeline_id.split('.')[:-1])
# if '.' in modular_pipeline_id else None)` is extracting the parent modular pipeline ID
# from the given modular pipeline ID.
parent_modular_pipeline_id = (
".".join(modular_pipeline_id.split(".")[:-1])
if "." in modular_pipeline_id
else None
)

# Add the node's registered pipelines to the modular pipeline's registered pipelines.
# Basically this means if the node belongs to the "__default__" pipeline, for example,
# so does the modular pipeline.
modular_pipeline.pipelines.update(node.pipelines)
if parent_modular_pipeline_id:
parent_modular_pipeline = self.get_or_create_modular_pipeline(
parent_modular_pipeline_id
)
parent_modular_pipeline.pipelines.update(modular_pipeline.pipelines)
parent_modular_pipeline.tags.update(modular_pipeline.tags)

modular_pipeline.tags.update(node.tags)
# add current modular pipeline and input/output datasets
# of a modular pipeline as a child to the parent modular
# pipeline based on the rules
self._add_children_to_parent_pipeline(
parent_modular_pipeline,
modular_pipeline_id,
modular_pipeline_inputs_outputs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very unintuitive - the method is called _add_children, but in it we have the totally unrelated code for adding the modular pipeline itself to it's parent's children. Could this be factored out as a separate method which is called right after _add_children instead?

Comment on lines 285 to 287
if dataset not in self.node_mod_pipeline_map:
self.node_mod_pipeline_map[dataset] = set()
self.node_mod_pipeline_map[dataset].add(modular_pipeline_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultdict to the rescue here...

Comment on lines 314 to 316
if io_id not in self.node_mod_pipeline_map:
self.node_mod_pipeline_map[io_id] = set()
self.node_mod_pipeline_map[io_id].add(modular_pipeline.id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultdict...

Comment on lines 307 to 310
if (
io_id not in modular_pipeline_inputs_outputs
and io_id not in self.parameters
):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filter could've just been added to the previous for-comprehension, or as a separate one on its own and then we don't need the extra nestedness.

"""
# Determine the parent modular pipeline ID
parent_modular_pipeline_id = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd simply call this parent_id, variables have context and in this function the only thing that can be a parent is a modular pipeline, thus it's implied and doesn't need to be part of the variable name.

modular_pipeline_inputs_outputs,
modular_pipeline.tags.update(node.tags)

hashed_io_ids = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would name this just io_ids.

Copy link
Member

@merelcht merelcht left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some final minor comments, but nothing blocking!

`input_node.modular_pipelines`.
def add_inputs(self, modular_pipeline_id: str, inputs: Set[str]) -> None:
"""
Add input datasets to the modular pipeline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it's a good idea to describe the following code in the docstring:

 if is_dataset_param(_input):
                self.parameters.add(hashed_input)

modular_pipeline_inputs_outputs: Set[str],
):
"""
Helper to add modular_pipeline children correctly to parent modular pipelines
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Helper to add modular_pipeline children correctly to parent modular pipelines
Helper to add modular pipeline children correctly to parent modular pipelines

@rashidakanchwala rashidakanchwala deleted the chore/modular-pipeline-refactor-repositories branch July 3, 2024 08:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants