Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aiohttp.client_exceptions.ClientConnectorError to be a retryable error in databricks provider #43080

Closed
2 tasks done
rawwar opened this issue Oct 16, 2024 · 7 comments · Fixed by #43091
Closed
2 tasks done

Comments

@rawwar
Copy link
Collaborator

rawwar commented Oct 16, 2024

Description

When there are SSL handshake issues(And usually intermittent), All deferrable Databricks operators fail in deferrable mode without retrying as aiohttp.client_exceptions.ClientConnectorError is not a retryable error.

As of now, we only consider aiohttp.ClientResponseError to be retryable. I would like to make aiohttp.client_exceptions.ClientConnectorError error to be retryable.

Use case/motivation

When SSL handshake takes longer(usually 60 seconds by default), it fails with the below error:


  result = cls.__new__(cls)
[2024-10-16, 09:27:20 UTC] {taskinstance.py:1598} ERROR - Trigger failed:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/aiohttp/connector.py", line 1098, in _wrap_create_connection
    return await self._loop.create_connection(*args, **kwargs, sock=sock)
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 1103, in create_connection
    transport, protocol = await self._create_connection_transport(
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 1133, in _create_connection_transport
    await waiter
ConnectionAbortedError: SSL handshake is taking longer than 60.0 seconds: aborting the connection
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/jobs/triggerer_job_runner.py", line 529, in cleanup_finished_triggers
    result = details["task"].result()
  File "/usr/local/lib/python3.10/site-packages/airflow/jobs/triggerer_job_runner.py", line 607, in run_trigger
    async for event in trigger.run():
  File "/usr/local/lib/python3.10/site-packages/airflow/providers/databricks/triggers/databricks.py", line 86, in run
    run_state = await self.hook.a_get_run_state(self.run_id)
  File "/usr/local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks.py", line 417, in a_get_run_state
    response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
  File "/usr/local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 651, in _a_do_api_call
    async for attempt in self._a_get_retry_object():
  File "/usr/local/lib/python3.10/site-packages/tenacity/_asyncio.py", line 71, in __anext__
    do = self.iter(retry_state=self._retry_state)
  File "/usr/local/lib/python3.10/site-packages/tenacity/__init__.py", line 314, in iter
    return fut.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 653, in _a_do_api_call
    async with request_func(
  File "/usr/local/lib/python3.10/site-packages/aiohttp/client.py", line 1359, in __aenter__
    self._resp: _RetType = await self._coro
  File "/usr/local/lib/python3.10/site-packages/aiohttp/client.py", line 663, in _request
    conn = await self._connector.connect(
  File "/usr/local/lib/python3.10/site-packages/aiohttp/connector.py", line 563, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "/usr/local/lib/python3.10/site-packages/aiohttp/connector.py", line 1032, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
  File "/usr/local/lib/python3.10/site-packages/aiohttp/connector.py", line 1366, in _create_direct_connection
    raise last_exc
  File "/usr/local/lib/python3.10/site-packages/aiohttp/connector.py", line 1335, in _create_direct_connection
    transp, proto = await self._wrap_create_connection(
  File "/usr/local/lib/python3.10/site-packages/aiohttp/connector.py", line 1106, in _wrap_create_connection
    raise client_error(req.connection_key, exc) from exc
    aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host adb-******.***REDACTED****.azuredatabricks.net:443 ssl:default [None]

And, that's intermittent. Making this retryable will help

Related issues

NA

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@rawwar rawwar added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet labels Oct 16, 2024
@pankajkoti
Copy link
Member

+1 to making aiohttp.client_exceptions.ClientConnectorError a retryable error

@rawwar rawwar self-assigned this Oct 16, 2024
@rawwar rawwar removed the needs-triage label for new issues that we didn't triage yet label Oct 16, 2024
@lucafurrer
Copy link
Contributor

+1 from my side as well

@lucafurrer
Copy link
Contributor

By the way we observe some problems with asyncio.exceptions.TimeoutError. In my opinion this should be retryable as well.

@rawwar
Copy link
Collaborator Author

rawwar commented Oct 17, 2024

By the way we observe some problems with asyncio.exceptions.TimeoutError. In my opinion this should be retryable as well.

Can you please share the exception message? I think that message doesn't matter. But, can be added

Thinking about this, I think, we can let users pass their own _retryable_error method, which will validate if retry is allowed or not.

As of now, we only take retry_limit and retry_delay as arguments. Adding support for custom retry function and after func should be good enough to handle all custom requirements

"retry": retry_if_exception(self._retryable_error),

@lucafurrer
Copy link
Contributor

By the way we observe some problems with asyncio.exceptions.TimeoutError. In my opinion this should be retryable as well.

Can you please share the exception message?

Thinking about this, I think, we can let users pass their own _retryable_error method, which will validate if retry is allowed or not.

As of now, we only take retry_limit and retry_delay as arguments. Adding support for custom retry function and after func should be good enough to handle all custom requirements

"retry": retry_if_exception(self._retryable_error),

Happy to share the message:

[2024-10-15, 20:07:05 CEST] {baseoperator.py:1598} ERROR - Trigger failed:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/triggerer_job_runner.py", line 529, in cleanup_finished_triggers
    result = details["task"].result()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/triggerer_job_runner.py", line 601, in run_trigger
    async for event in trigger.run():
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/databricks/triggers/databricks.py", line 86, in run
    run_state = await self.hook.a_get_run_state(self.run_id)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/databricks/hooks/databricks.py", line 417, in a_get_run_state
    response = await self._a_do_api_call(GET_RUN_ENDPOINT, json)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 651, in _a_do_api_call
    async for attempt in self._a_get_retry_object():
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/_asyncio.py", line 71, in __anext__
    do = self.iter(retry_state=self._retry_state)
  File "/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 314, in iter
    return fut.result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 653, in _a_do_api_call
    async with request_func(
  File "/home/airflow/.local/lib/python3.8/site-packages/aiohttp/client.py", line 1194, in __aenter__
    self._resp = await self._coro
  File "/home/airflow/.local/lib/python3.8/site-packages/aiohttp/client.py", line 605, in _request
    await resp.start(conn)
  File "/home/airflow/.local/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 981, in start
    self._continue = None
  File "/home/airflow/.local/lib/python3.8/site-packages/aiohttp/helpers.py", line 735, in __exit__
    raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError
[2024-10-15, 20:07:05 CEST] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1599, in resume_execution
    raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
airflow.exceptions.TaskDeferralError: Trigger failure

If there is a possibility to customize the retryable error this can be a solution for me. I could imagine that some of the exception types can be dependent on the cloud provider where Databricks is running on...

@lucafurrer
Copy link
Contributor

However I think a timeout should be retryable per default as this is typically something which will be fixed a little bit later...

@rawwar
Copy link
Collaborator Author

rawwar commented Oct 17, 2024

However I think a timeout should be retryable per default as this is typically something which will be fixed a little bit later...

@lucafurrer , I will create a separate issue and take it up later today. If you can create a new issue before that, please mention me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants