Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion): fix datajob patcher #10827

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 14 additions & 44 deletions metadata-ingestion/src/datahub/specific/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,45 +378,29 @@ def set_output_datasets(self, outputs: List[Edge]) -> "DataJobPatchBuilder":
)
return self

def add_input_dataset_field(
self, input: Union[Edge, Urn, str]
) -> "DataJobPatchBuilder":
def add_input_dataset_field(self, input: Union[Urn, str]) -> "DataJobPatchBuilder":
"""
Adds an input dataset field to the DataJobPatchBuilder.

Args:
input: The input dataset field, which can be an Edge object, Urn object, or a string.
input: The input dataset field, which can be an Urn object, or a string.

Returns:
The DataJobPatchBuilder instance.

Raises:
ValueError: If the input is not a Schema Field urn.

Notes:
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
"""
if isinstance(input, Edge):
input_urn: str = input.destinationUrn
input_edge: Edge = input
elif isinstance(input, (Urn, str)):
input_urn = str(input)
if not input_urn.startswith("urn:li:schemaField:"):
raise ValueError(f"Input {input} is not a Schema Field urn")

input_edge = Edge(
destinationUrn=input_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)
input_urn = str(input)
urn = Urn.create_from_string(input_urn)
if not urn.get_type() == "schemaField":
raise ValueError(f"Input {input} is not a Schema Field urn")

self._ensure_urn_type("schemaField", [input_edge], "add_input_dataset_field")
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/inputDatasetFields/{self.quote(input_urn)}",
value=input_edge,
value={},
)
return self

Expand Down Expand Up @@ -467,44 +451,30 @@ def set_input_dataset_fields(self, inputs: List[Edge]) -> "DataJobPatchBuilder":
return self

def add_output_dataset_field(
self, output: Union[Edge, Urn, str]
self, output: Union[Urn, str]
) -> "DataJobPatchBuilder":
"""
Adds an output dataset field to the DataJobPatchBuilder.

Args:
output: The output dataset field, which can be an Edge object, Urn object, or a string.
output: The output dataset field, which can be an Urn object, or a string.

Returns:
The DataJobPatchBuilder instance.

Raises:
ValueError: If the output is not a Schema Field urn.

Notes:
If `output` is an Edge object, it is used directly. If `output` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
"""
if isinstance(output, Edge):
output_urn: str = output.destinationUrn
output_edge: Edge = output
elif isinstance(output, (Urn, str)):
output_urn = str(output)
if not output_urn.startswith("urn:li:schemaField:"):
raise ValueError(f"Input {input} is not a Schema Field urn")

output_edge = Edge(
destinationUrn=output_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)
output_urn = str(output)
urn = Urn.create_from_string(output_urn)
if not urn.get_type() == "schemaField":
raise ValueError(f"Input {output} is not a Schema Field urn")

self._ensure_urn_type("schemaField", [output_edge], "add_output_dataset_field")
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/outputDatasetFields/{self.quote(output_urn)}",
value=output_edge,
david-leifker marked this conversation as resolved.
Show resolved Hide resolved
value={},
)
return self

Expand Down
Loading