From 6fa9512c00db7dc442bee0ca05225a808b6eb283 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Wed, 3 Jul 2024 11:47:08 -0500 Subject: [PATCH] fix(ingestion): fix datajob patcher (#10827) --- .../src/datahub/specific/datajob.py | 58 +++++-------------- 1 file changed, 14 insertions(+), 44 deletions(-) diff --git a/metadata-ingestion/src/datahub/specific/datajob.py b/metadata-ingestion/src/datahub/specific/datajob.py index acbc1a860968bd..2d944edeb36403 100644 --- a/metadata-ingestion/src/datahub/specific/datajob.py +++ b/metadata-ingestion/src/datahub/specific/datajob.py @@ -378,45 +378,29 @@ def set_output_datasets(self, outputs: List[Edge]) -> "DataJobPatchBuilder": ) return self - def add_input_dataset_field( - self, input: Union[Edge, Urn, str] - ) -> "DataJobPatchBuilder": + def add_input_dataset_field(self, input: Union[Urn, str]) -> "DataJobPatchBuilder": """ Adds an input dataset field to the DataJobPatchBuilder. Args: - input: The input dataset field, which can be an Edge object, Urn object, or a string. + input: The input dataset field, which can be an Urn object, or a string. Returns: The DataJobPatchBuilder instance. Raises: ValueError: If the input is not a Schema Field urn. - - Notes: - If `input` is an Edge object, it is used directly. If `input` is a Urn object or string, - it is converted to an Edge object and added with default audit stamps. """ - if isinstance(input, Edge): - input_urn: str = input.destinationUrn - input_edge: Edge = input - elif isinstance(input, (Urn, str)): - input_urn = str(input) - if not input_urn.startswith("urn:li:schemaField:"): - raise ValueError(f"Input {input} is not a Schema Field urn") - - input_edge = Edge( - destinationUrn=input_urn, - created=self._mint_auditstamp(), - lastModified=self._mint_auditstamp(), - ) + input_urn = str(input) + urn = Urn.create_from_string(input_urn) + if not urn.get_type() == "schemaField": + raise ValueError(f"Input {input} is not a Schema Field urn") - self._ensure_urn_type("schemaField", [input_edge], "add_input_dataset_field") self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", path=f"/inputDatasetFields/{self.quote(input_urn)}", - value=input_edge, + value={}, ) return self @@ -467,44 +451,30 @@ def set_input_dataset_fields(self, inputs: List[Edge]) -> "DataJobPatchBuilder": return self def add_output_dataset_field( - self, output: Union[Edge, Urn, str] + self, output: Union[Urn, str] ) -> "DataJobPatchBuilder": """ Adds an output dataset field to the DataJobPatchBuilder. Args: - output: The output dataset field, which can be an Edge object, Urn object, or a string. + output: The output dataset field, which can be an Urn object, or a string. Returns: The DataJobPatchBuilder instance. Raises: ValueError: If the output is not a Schema Field urn. - - Notes: - If `output` is an Edge object, it is used directly. If `output` is a Urn object or string, - it is converted to an Edge object and added with default audit stamps. """ - if isinstance(output, Edge): - output_urn: str = output.destinationUrn - output_edge: Edge = output - elif isinstance(output, (Urn, str)): - output_urn = str(output) - if not output_urn.startswith("urn:li:schemaField:"): - raise ValueError(f"Input {input} is not a Schema Field urn") - - output_edge = Edge( - destinationUrn=output_urn, - created=self._mint_auditstamp(), - lastModified=self._mint_auditstamp(), - ) + output_urn = str(output) + urn = Urn.create_from_string(output_urn) + if not urn.get_type() == "schemaField": + raise ValueError(f"Input {output} is not a Schema Field urn") - self._ensure_urn_type("schemaField", [output_edge], "add_output_dataset_field") self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", path=f"/outputDatasetFields/{self.quote(output_urn)}", - value=output_edge, + value={}, ) return self