diff --git a/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/base.py b/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/base.py index 0b389535007d7..63b492ce911af 100644 --- a/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/base.py +++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/hooks/base.py @@ -88,7 +88,7 @@ def test_connection(self) -> tuple[bool, str]: """Test Connectivity from the UI.""" try: config = self.get_connection(self.kafka_config_id).extra_dejson - t = AdminClient(config, timeout=10).list_topics() + t = AdminClient(config).list_topics(timeout=10) if t: return True, "Connection successful." except Exception as e: diff --git a/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_base.py b/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_base.py index c1ca9544b8f9b..5701a789b70e6 100644 --- a/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_base.py +++ b/providers/apache/kafka/tests/unit/apache/kafka/hooks/test_base.py @@ -56,7 +56,9 @@ def test_test_connection(self, mock_get_connection, admin_client, hook): config = {"bootstrap.servers": MagicMock()} mock_get_connection.return_value.extra_dejson = config connection = hook.test_connection() - admin_client.assert_called_once_with(config, timeout=10) + admin_client.assert_called_once_with(config) + mock_admin_instance = admin_client.return_value + mock_admin_instance.list_topics.assert_called_once_with(timeout=TIMEOUT) assert connection == (True, "Connection successful.") @mock.patch( @@ -68,7 +70,9 @@ def test_test_connection_no_topics(self, mock_get_connection, admin_client, hook config = {"bootstrap.servers": MagicMock()} mock_get_connection.return_value.extra_dejson = config connection = hook.test_connection() - admin_client.assert_called_once_with(config, timeout=TIMEOUT) + admin_client.assert_called_once_with(config) + mock_admin_instance = admin_client.return_value + mock_admin_instance.list_topics.assert_called_once_with(timeout=TIMEOUT) assert connection == (False, "Failed to establish connection.") @mock.patch("airflow.providers.apache.kafka.hooks.base.AdminClient")