From 7f31b857e0297a8f17f02483fc4e2ba6134d33d0 Mon Sep 17 00:00:00 2001 From: Natanel Rudyuklakir Date: Tue, 22 Jul 2025 22:30:55 +0300 Subject: [PATCH 1/2] added both different namespace for taskmanager or for global namespace --- .../providers/apache/flink/sensors/flink_kubernetes.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py b/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py index ad0cc1cfa54fc..4d86aa3d5ae06 100644 --- a/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py +++ b/providers/apache/flink/src/airflow/providers/apache/flink/sensors/flink_kubernetes.py @@ -62,6 +62,7 @@ def __init__( api_group: str = "flink.apache.org", api_version: str = "v1beta1", plural: str = "flinkdeployments", + taskmanager_pods_namespace: str | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -73,6 +74,7 @@ def __init__( self.api_group = api_group self.api_version = api_version self.plural = plural + self.taskmanager_pods_namespace = taskmanager_pods_namespace def _log_driver(self, application_state: str, response: dict) -> None: log_method = self.log.error if application_state in self.FAILURE_STATES else self.log.info @@ -88,7 +90,9 @@ def _log_driver(self, application_state: str, response: dict) -> None: task_manager_labels = status_info["taskManager"]["labelSelector"] all_pods = self.hook.get_namespaced_pod_list( - namespace="default", watch=False, label_selector=task_manager_labels + namespace=self.taskmanager_pods_namespace or self.namespace or "default", + watch=False, + label_selector=task_manager_labels, ) namespace = response["metadata"]["namespace"] From 977b354d601a431a11addd25fbe019e109f45daa Mon Sep 17 00:00:00 2001 From: Natanel Rudyuklakir Date: Wed, 23 Jul 2025 20:25:36 +0300 Subject: [PATCH 2/2] added tests --- .../flink/sensors/test_flink_kubernetes.py | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py b/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py index 0ac117caa3287..744d82f290d63 100644 --- a/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py +++ b/providers/apache/flink/tests/unit/apache/flink/sensors/test_flink_kubernetes.py @@ -1143,6 +1143,76 @@ def test_driver_logging_completed( name="flink-stream-example", ) + @patch( + "kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object", + return_value=TEST_READY_CLUSTER, + ) + @patch("logging.Logger.info") + @patch( + "airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod_logs", + return_value=TEST_POD_LOGS, + ) + @patch( + "airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_namespaced_pod_list", + return_value=TASK_MANAGER_POD_LIST, + ) + def test_logging_taskmanager_from_taskmanager_namespace_when_namespace_is_set( + self, mock_namespaced_pod_list, mock_pod_logs, info_log_call, mock_namespaced_crd, mock_kube_conn + ): + namespace = "different-namespace123456" + namespae_name = "test123" + + sensor = FlinkKubernetesSensor( + application_name="flink-stream-example", + namespace=namespace, + taskmanager_pods_namespace=namespae_name, + attach_log=True, + dag=self.dag, + task_id="test_task_id", + ) + + sensor.poke(context=None) + + mock_namespaced_pod_list.assert_called_once_with( + namespace=namespae_name, + watch=False, + label_selector="component=taskmanager,app=flink-stream-example", + ) + + @patch( + "kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object", + return_value=TEST_READY_CLUSTER, + ) + @patch("logging.Logger.info") + @patch( + "airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod_logs", + return_value=TEST_POD_LOGS, + ) + @patch( + "airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_namespaced_pod_list", + return_value=TASK_MANAGER_POD_LIST, + ) + def test_logging_taskmanager_from_non_default_namespace( + self, mock_namespaced_pod_list, mock_pod_logs, info_log_call, mock_namespaced_crd, mock_kube_conn + ): + namespae_name = "test123" + + sensor = FlinkKubernetesSensor( + application_name="flink-stream-example", + namespace=namespae_name, + attach_log=True, + dag=self.dag, + task_id="test_task_id", + ) + + sensor.poke(context=None) + + mock_namespaced_pod_list.assert_called_once_with( + namespace=namespae_name, + watch=False, + label_selector="component=taskmanager,app=flink-stream-example", + ) + @patch( "kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object", return_value=TEST_READY_CLUSTER,