Skip to content

Commit

Permalink
openlineage: migrate OpenLineage provider to V2 facets. (#39530)
Browse files Browse the repository at this point in the history
* Migrate OpenLineage proivder to V2 facets.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

* Migrate amazon to v2 facets.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

* Migrate OL-dependent providers to v2 facets.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

* Migrate Google provider to V2 facets.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

* Make migration backwards-compatible with previous OL provider versions.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

* Move V2 facets code imports to `common.compat` provider.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

* Add pre-commit hook for check on `common.compat` imports over OL v1.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

---------

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
  • Loading branch information
JDarDagran authored Jul 23, 2024
1 parent 9ec9eb7 commit 0206a4c
Show file tree
Hide file tree
Showing 78 changed files with 1,046 additions and 666 deletions.
8 changes: 8 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,14 @@ repos:
exclude: ^airflow/kubernetes/
entry: ./scripts/ci/pre_commit/check_airflow_k8s_not_used.py
additional_dependencies: ['rich>=12.4.4']
- id: check-common-compat-used-for-openlineage
name: Check common.compat is used for OL deprecated classes
language: python
files: ^airflow/.*\.py$
require_serial: true
exclude: ^airflow/openlineage/
entry: ./scripts/ci/pre_commit/check_common_compat_used_for_openlineage.py
additional_dependencies: ['rich>=12.4.4']
- id: check-airflow-providers-bug-report-template
name: Check airflow-bug-report provider list is sorted/unique
language: python
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/datasets/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
if TYPE_CHECKING:
from urllib.parse import SplitResult

from openlineage.client.run import Dataset as OpenLineageDataset
from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset


def create_dataset(*, bucket: str, key: str, extra=None) -> Dataset:
Expand All @@ -39,7 +39,7 @@ def sanitize_uri(uri: SplitResult) -> SplitResult:

def convert_dataset_to_openlineage(dataset: Dataset, lineage_context) -> OpenLineageDataset:
"""Translate Dataset with valid AIP-60 uri to OpenLineage with assistance from the hook."""
from openlineage.client.run import Dataset as OpenLineageDataset
from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset

bucket, key = S3Hook.parse_s3_url(dataset.uri)
return OpenLineageDataset(namespace=f"s3://{bucket}", name=key if key else "/")
33 changes: 16 additions & 17 deletions airflow/providers/amazon/aws/operators/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields

if TYPE_CHECKING:
from openlineage.client.facet import BaseFacet
from openlineage.client.run import Dataset

from airflow.providers.common.compat.openlineage.facet import BaseFacet, Dataset, DatasetFacet
from airflow.providers.openlineage.extractors.base import OperatorLineage
from airflow.utils.context import Context

Expand Down Expand Up @@ -217,20 +215,19 @@ def get_openlineage_facets_on_complete(self, _) -> OperatorLineage:
path where the results are saved (user's prefix + some UUID), we are creating a dataset with the
user-provided path only. This should make it easier to match this dataset across different processes.
"""
from openlineage.client.facet import (
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
Error,
ExternalQueryRunFacet,
ExtractionError,
ExtractionErrorRunFacet,
SqlJobFacet,
SQLJobFacet,
)
from openlineage.client.run import Dataset

from airflow.providers.openlineage.extractors.base import OperatorLineage
from airflow.providers.openlineage.sqlparser import SQLParser

sql_parser = SQLParser(dialect="generic")

job_facets: dict[str, BaseFacet] = {"sql": SqlJobFacet(query=sql_parser.normalize_sql(self.query))}
job_facets: dict[str, BaseFacet] = {"sql": SQLJobFacet(query=sql_parser.normalize_sql(self.query))}
parse_result = sql_parser.parse(sql=self.query)

if not parse_result:
Expand All @@ -242,7 +239,7 @@ def get_openlineage_facets_on_complete(self, _) -> OperatorLineage:
totalTasks=len(self.query) if isinstance(self.query, list) else 1,
failedTasks=len(parse_result.errors),
errors=[
ExtractionError(
Error(
errorMessage=error.message,
stackTrace=None,
task=error.origin_statement,
Expand Down Expand Up @@ -284,13 +281,13 @@ def get_openlineage_facets_on_complete(self, _) -> OperatorLineage:
return OperatorLineage(job_facets=job_facets, run_facets=run_facets, inputs=inputs, outputs=outputs)

def get_openlineage_dataset(self, database, table) -> Dataset | None:
from openlineage.client.facet import (
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
Identifier,
SchemaDatasetFacet,
SchemaField,
SchemaDatasetFacetFields,
SymlinksDatasetFacet,
SymlinksDatasetFacetIdentifiers,
)
from openlineage.client.run import Dataset

client = self.hook.get_conn()
try:
Expand All @@ -301,10 +298,10 @@ def get_openlineage_dataset(self, database, table) -> Dataset | None:
# Dataset has also its' physical location which we can add in symlink facet.
s3_location = table_metadata["TableMetadata"]["Parameters"]["location"]
parsed_path = urlparse(s3_location)
facets: dict[str, BaseFacet] = {
facets: dict[str, DatasetFacet] = {
"symlinks": SymlinksDatasetFacet(
identifiers=[
SymlinksDatasetFacetIdentifiers(
Identifier(
namespace=f"{parsed_path.scheme}://{parsed_path.netloc}",
name=str(parsed_path.path),
type="TABLE",
Expand All @@ -313,7 +310,9 @@ def get_openlineage_dataset(self, database, table) -> Dataset | None:
)
}
fields = [
SchemaField(name=column["Name"], type=column["Type"], description=column.get("Comment"))
SchemaDatasetFacetFields(
name=column["Name"], type=column["Type"], description=column["Comment"]
)
for column in table_metadata["TableMetadata"]["Columns"]
]
if fields:
Expand Down
18 changes: 7 additions & 11 deletions airflow/providers/amazon/aws/operators/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,7 @@ def execute(self, context: Context):
)

def get_openlineage_facets_on_start(self):
from openlineage.client.run import Dataset

from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage

dest_bucket_name, dest_bucket_key = S3Hook.get_s3_bucket_key(
Expand Down Expand Up @@ -439,8 +438,7 @@ def execute(self, context: Context):
s3_hook.load_bytes(self.data, s3_key, s3_bucket, self.replace, self.encrypt, self.acl_policy)

def get_openlineage_facets_on_start(self):
from openlineage.client.run import Dataset

from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage

bucket, key = S3Hook.get_s3_bucket_key(self.s3_bucket, self.s3_key, "dest_bucket", "dest_key")
Expand Down Expand Up @@ -546,13 +544,12 @@ def execute(self, context: Context):

def get_openlineage_facets_on_complete(self, task_instance):
"""Implement _on_complete because object keys are resolved in execute()."""
from openlineage.client.facet import (
from airflow.providers.common.compat.openlineage.facet import (
Dataset,
LifecycleStateChange,
LifecycleStateChangeDatasetFacet,
LifecycleStateChangeDatasetFacetPreviousIdentifier,
PreviousIdentifier,
)
from openlineage.client.run import Dataset

from airflow.providers.openlineage.extractors import OperatorLineage

if not self._keys:
Expand All @@ -570,7 +567,7 @@ def get_openlineage_facets_on_complete(self, task_instance):
facets={
"lifecycleStateChange": LifecycleStateChangeDatasetFacet(
lifecycleStateChange=LifecycleStateChange.DROP.value,
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
previousIdentifier=PreviousIdentifier(
namespace=bucket_url,
name=key,
),
Expand Down Expand Up @@ -725,8 +722,7 @@ def execute(self, context: Context):
self.log.info("Upload successful")

def get_openlineage_facets_on_start(self):
from openlineage.client.run import Dataset

from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage

dest_bucket_name, dest_bucket_key = S3Hook.get_s3_bucket_key(
Expand Down
5 changes: 2 additions & 3 deletions airflow/providers/amazon/aws/operators/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@
from airflow.utils.json import AirflowJsonEncoder

if TYPE_CHECKING:
from openlineage.client.run import Dataset

from airflow.providers.common.compat.openlineage.facet import Dataset
from airflow.providers.openlineage.extractors.base import OperatorLineage
from airflow.utils.context import Context

Expand Down Expand Up @@ -208,7 +207,7 @@ def hook(self):

@staticmethod
def path_to_s3_dataset(path) -> Dataset:
from openlineage.client.run import Dataset
from airflow.providers.common.compat.openlineage.facet import Dataset

path = path.replace("s3://", "")
split_path = path.split("/")
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ versions:

dependencies:
- apache-airflow>=2.7.0
- apache-airflow-providers-common-compat>=1.1.0
- apache-airflow-providers-common-sql>=1.3.1
- apache-airflow-providers-http
- apache-airflow-providers-common-compat>=1.1.0
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/common/compat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "1.0.0"
__version__ = "1.1.0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.7.0"
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/common/compat/openlineage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
158 changes: 158 additions & 0 deletions airflow/providers/common/compat/openlineage/facet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING


def create_no_op(*_, **__) -> None:
"""
Create a no-op placeholder.
This function creates and returns a None value, used as a placeholder when the OpenLineage client
library is available. It represents an action that has no effect.
"""
return None


if TYPE_CHECKING:
from openlineage.client.generated.base import (
BaseFacet,
Dataset,
DatasetFacet,
InputDataset,
OutputDataset,
RunFacet,
)
from openlineage.client.generated.column_lineage_dataset import (
ColumnLineageDatasetFacet,
Fields,
InputField,
)
from openlineage.client.generated.documentation_dataset import DocumentationDatasetFacet
from openlineage.client.generated.error_message_run import ErrorMessageRunFacet
from openlineage.client.generated.external_query_run import ExternalQueryRunFacet
from openlineage.client.generated.extraction_error_run import Error, ExtractionErrorRunFacet
from openlineage.client.generated.lifecycle_state_change_dataset import (
LifecycleStateChange,
LifecycleStateChangeDatasetFacet,
PreviousIdentifier,
)
from openlineage.client.generated.output_statistics_output_dataset import (
OutputStatisticsOutputDatasetFacet,
)
from openlineage.client.generated.schema_dataset import SchemaDatasetFacet, SchemaDatasetFacetFields
from openlineage.client.generated.sql_job import SQLJobFacet
from openlineage.client.generated.symlinks_dataset import Identifier, SymlinksDatasetFacet
else:
try:
try:
from openlineage.client.generated.base import (
BaseFacet,
Dataset,
DatasetFacet,
InputDataset,
OutputDataset,
RunFacet,
)
from openlineage.client.generated.column_lineage_dataset import (
ColumnLineageDatasetFacet,
Fields,
InputField,
)
from openlineage.client.generated.documentation_dataset import DocumentationDatasetFacet
from openlineage.client.generated.error_message_run import ErrorMessageRunFacet
from openlineage.client.generated.external_query_run import ExternalQueryRunFacet
from openlineage.client.generated.extraction_error_run import Error, ExtractionErrorRunFacet
from openlineage.client.generated.lifecycle_state_change_dataset import (
LifecycleStateChange,
LifecycleStateChangeDatasetFacet,
PreviousIdentifier,
)
from openlineage.client.generated.output_statistics_output_dataset import (
OutputStatisticsOutputDatasetFacet,
)
from openlineage.client.generated.schema_dataset import (
SchemaDatasetFacet,
SchemaDatasetFacetFields,
)
from openlineage.client.generated.sql_job import SQLJobFacet
from openlineage.client.generated.symlinks_dataset import Identifier, SymlinksDatasetFacet
except ImportError:
from openlineage.client.facet import (
BaseFacet,
BaseFacet as DatasetFacet,
BaseFacet as RunFacet,
ColumnLineageDatasetFacet,
ColumnLineageDatasetFacetFieldsAdditional as Fields,
ColumnLineageDatasetFacetFieldsAdditionalInputFields as InputField,
DocumentationDatasetFacet,
ErrorMessageRunFacet,
ExternalQueryRunFacet,
ExtractionError as Error,
ExtractionErrorRunFacet,
LifecycleStateChange,
LifecycleStateChangeDatasetFacet,
LifecycleStateChangeDatasetFacetPreviousIdentifier as PreviousIdentifier,
OutputStatisticsOutputDatasetFacet,
SchemaDatasetFacet,
SchemaField as SchemaDatasetFacetFields,
SqlJobFacet as SQLJobFacet,
SymlinksDatasetFacet,
SymlinksDatasetFacetIdentifiers as Identifier,
)
from openlineage.client.run import Dataset, InputDataset, OutputDataset
except ImportError:
# When no openlineage client library installed we create no-op classes.
# This allows avoiding raising ImportError when making OL imports in top-level code
# (which shouldn't be the case anyway).
BaseFacet = Dataset = DatasetFacet = InputDataset = OutputDataset = RunFacet = (
ColumnLineageDatasetFacet
) = Fields = InputField = DocumentationDatasetFacet = ErrorMessageRunFacet = ExternalQueryRunFacet = (
Error
) = ExtractionErrorRunFacet = LifecycleStateChange = LifecycleStateChangeDatasetFacet = (
PreviousIdentifier
) = OutputStatisticsOutputDatasetFacet = SchemaDatasetFacet = SchemaDatasetFacetFields = (
SQLJobFacet
) = Identifier = SymlinksDatasetFacet = create_no_op

__all__ = [
"BaseFacet",
"Dataset",
"DatasetFacet",
"InputDataset",
"OutputDataset",
"RunFacet",
"ColumnLineageDatasetFacet",
"Fields",
"InputField",
"DocumentationDatasetFacet",
"ErrorMessageRunFacet",
"ExternalQueryRunFacet",
"Error",
"ExtractionErrorRunFacet",
"LifecycleStateChange",
"LifecycleStateChangeDatasetFacet",
"PreviousIdentifier",
"OutputStatisticsOutputDatasetFacet",
"SchemaDatasetFacet",
"SchemaDatasetFacetFields",
"SQLJobFacet",
"Identifier",
"SymlinksDatasetFacet",
]
Loading

0 comments on commit 0206a4c

Please sign in to comment.