Skip to content

Commit

Permalink
fix(ingestion): fix datajob patcher (#10827)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Jul 3, 2024
1 parent b8af2b9 commit ea6bc61
Showing 1 changed file with 14 additions and 44 deletions.
58 changes: 14 additions & 44 deletions metadata-ingestion/src/datahub/specific/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,45 +378,29 @@ def set_output_datasets(self, outputs: List[Edge]) -> "DataJobPatchBuilder":
)
return self

def add_input_dataset_field(
self, input: Union[Edge, Urn, str]
) -> "DataJobPatchBuilder":
def add_input_dataset_field(self, input: Union[Urn, str]) -> "DataJobPatchBuilder":
"""
Adds an input dataset field to the DataJobPatchBuilder.
Args:
input: The input dataset field, which can be an Edge object, Urn object, or a string.
input: The input dataset field, which can be an Urn object, or a string.
Returns:
The DataJobPatchBuilder instance.
Raises:
ValueError: If the input is not a Schema Field urn.
Notes:
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
"""
if isinstance(input, Edge):
input_urn: str = input.destinationUrn
input_edge: Edge = input
elif isinstance(input, (Urn, str)):
input_urn = str(input)
if not input_urn.startswith("urn:li:schemaField:"):
raise ValueError(f"Input {input} is not a Schema Field urn")

input_edge = Edge(
destinationUrn=input_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)
input_urn = str(input)
urn = Urn.create_from_string(input_urn)
if not urn.get_type() == "schemaField":
raise ValueError(f"Input {input} is not a Schema Field urn")

self._ensure_urn_type("schemaField", [input_edge], "add_input_dataset_field")
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/inputDatasetFields/{self.quote(input_urn)}",
value=input_edge,
value={},
)
return self

Expand Down Expand Up @@ -467,44 +451,30 @@ def set_input_dataset_fields(self, inputs: List[Edge]) -> "DataJobPatchBuilder":
return self

def add_output_dataset_field(
self, output: Union[Edge, Urn, str]
self, output: Union[Urn, str]
) -> "DataJobPatchBuilder":
"""
Adds an output dataset field to the DataJobPatchBuilder.
Args:
output: The output dataset field, which can be an Edge object, Urn object, or a string.
output: The output dataset field, which can be an Urn object, or a string.
Returns:
The DataJobPatchBuilder instance.
Raises:
ValueError: If the output is not a Schema Field urn.
Notes:
If `output` is an Edge object, it is used directly. If `output` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
"""
if isinstance(output, Edge):
output_urn: str = output.destinationUrn
output_edge: Edge = output
elif isinstance(output, (Urn, str)):
output_urn = str(output)
if not output_urn.startswith("urn:li:schemaField:"):
raise ValueError(f"Input {input} is not a Schema Field urn")

output_edge = Edge(
destinationUrn=output_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)
output_urn = str(output)
urn = Urn.create_from_string(output_urn)
if not urn.get_type() == "schemaField":
raise ValueError(f"Input {output} is not a Schema Field urn")

self._ensure_urn_type("schemaField", [output_edge], "add_output_dataset_field")
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/outputDatasetFields/{self.quote(output_urn)}",
value=output_edge,
value={},
)
return self

Expand Down

0 comments on commit ea6bc61

Please sign in to comment.