From 8274a5448f98720316eb3b178bea0a3bb79ef55c Mon Sep 17 00:00:00 2001 From: Sven Thoms <21118431+shalberd@users.noreply.github.com> Date: Mon, 15 Jan 2024 19:55:49 +0100 Subject: [PATCH] Add package connector support for Airflow 2.x The check for subclasses of BaseOperator uses an outdated package name. This commit adds the new one from Airflow 2. Signed-off-by: Sven Thoms <21118431+shalberd@users.noreply.github.com> --- .../airflow_package_catalog_connector.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/elyra/pipeline/airflow/package_catalog_connector/airflow_package_catalog_connector.py b/elyra/pipeline/airflow/package_catalog_connector/airflow_package_catalog_connector.py index 23f5b6b13..e8a65b7d0 100644 --- a/elyra/pipeline/airflow/package_catalog_connector/airflow_package_catalog_connector.py +++ b/elyra/pipeline/airflow/package_catalog_connector/airflow_package_catalog_connector.py @@ -161,7 +161,7 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str # # Identify Python scripts that define classes that extend the - # airflow.models.BaseOperator class + # BaseOperator abstract class in module airflow.models.baseoperator # scripts_with_operator_class: List[str] = [] # Python scripts that contain operator definitions extends_baseoperator: List[str] = [] # Classes that extend BaseOperator @@ -188,7 +188,10 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str elif isinstance(node, ast.ImportFrom): node_module = node.module for name in node.names: - if "airflow.models" == node_module and name.name == "BaseOperator": + if ( + node_module in ["airflow.models", "airflow.models.baseoperator"] + and name.name == "BaseOperator" + ): imported_operator_classes.append(name.name) elif isinstance(node, ast.ClassDef): # determine whether this class extends the BaseOperator class