Skip to content

Commit

Permalink
updates per review
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Jul 3, 2024
1 parent f467964 commit 8d83476
Showing 1 changed file with 14 additions and 38 deletions.
52 changes: 14 additions & 38 deletions metadata-ingestion/src/datahub/specific/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,39 +379,27 @@ def set_output_datasets(self, outputs: List[Edge]) -> "DataJobPatchBuilder":
return self

def add_input_dataset_field(
self, input: Union[Edge, Urn, str]
self, input: Union[Urn, str]
) -> "DataJobPatchBuilder":
"""
Adds an input dataset field to the DataJobPatchBuilder.
Args:
input: The input dataset field, which can be an Edge object, Urn object, or a string.
input: The input dataset field, which can be an Urn object, or a string.
Returns:
The DataJobPatchBuilder instance.
Raises:
ValueError: If the input is not a Schema Field urn.
Notes:
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
"""
if isinstance(input, Edge):
input_urn: str = input.destinationUrn
input_edge: Edge = input
elif isinstance(input, (Urn, str)):
input_urn = str(input)
if not input_urn.startswith("urn:li:schemaField:"):
raise ValueError(f"Input {input} is not a Schema Field urn")

input_edge = Edge(
destinationUrn=input_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
input_urn = str(input)
urn = Urn.create_from_string(input_urn)
if not urn.get_type() == "schemaField":
raise ValueError(
f"Input {input} is not a Schema Field urn"
)

self._ensure_urn_type("schemaField", [input_edge], "add_input_dataset_field")
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
Expand Down Expand Up @@ -467,39 +455,27 @@ def set_input_dataset_fields(self, inputs: List[Edge]) -> "DataJobPatchBuilder":
return self

def add_output_dataset_field(
self, output: Union[Edge, Urn, str]
self, output: Union[Urn, str]
) -> "DataJobPatchBuilder":
"""
Adds an output dataset field to the DataJobPatchBuilder.
Args:
output: The output dataset field, which can be an Edge object, Urn object, or a string.
output: The output dataset field, which can be an Urn object, or a string.
Returns:
The DataJobPatchBuilder instance.
Raises:
ValueError: If the output is not a Schema Field urn.
Notes:
If `output` is an Edge object, it is used directly. If `output` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
"""
if isinstance(output, Edge):
output_urn: str = output.destinationUrn
output_edge: Edge = output
elif isinstance(output, (Urn, str)):
output_urn = str(output)
if not output_urn.startswith("urn:li:schemaField:"):
raise ValueError(f"Input {input} is not a Schema Field urn")

output_edge = Edge(
destinationUrn=output_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
output_urn = str(output)
urn = Urn.create_from_string(output_urn)
if not urn.get_type() == "schemaField":
raise ValueError(
f"Input {output} is not a Schema Field urn"
)

self._ensure_urn_type("schemaField", [output_edge], "add_output_dataset_field")
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
Expand Down

0 comments on commit 8d83476

Please sign in to comment.