diff --git a/providers/airbyte/docs/connections.rst b/providers/airbyte/docs/connections.rst index f0c14000aa61e..50a85188cff9d 100644 --- a/providers/airbyte/docs/connections.rst +++ b/providers/airbyte/docs/connections.rst @@ -40,3 +40,11 @@ Client ID (required) Client Secret (required) The Client Secret to connect to the Airbyte server. You can find this information in the Settings / Applications page in Airbyte UI. + +Extras (optional) + Specify custom proxies in json format. + Following default requests parameters are taken into account: + + * ``proxies`` + + Example: ``{"http": "http://proxy.example.com:8080", "https": "http://proxy.example.com:8080"}`` diff --git a/providers/airbyte/pyproject.toml b/providers/airbyte/pyproject.toml index f573261181de3..743f2ccc8162a 100644 --- a/providers/airbyte/pyproject.toml +++ b/providers/airbyte/pyproject.toml @@ -59,6 +59,7 @@ requires-python = "~=3.9" dependencies = [ "apache-airflow>=2.9.0", "airbyte-api>=0.52.0", + "requests>=2.31.0", ] [dependency-groups] diff --git a/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py b/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py index fd2498532010f..8a9c14f9ba8b2 100644 --- a/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py +++ b/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py @@ -23,6 +23,7 @@ from airbyte_api import AirbyteAPI from airbyte_api.api import CancelJobRequest, GetJobRequest from airbyte_api.models import JobCreateRequest, JobStatusEnum, JobTypeEnum, SchemeClientCredentials, Security +from requests import Session from airflow.exceptions import AirflowException from airflow.hooks.base import BaseHook @@ -63,6 +64,7 @@ def get_conn_params(self, conn_id: str) -> Any: conn_params["client_id"] = conn.login conn_params["client_secret"] = conn.password conn_params["token_url"] = conn.schema or "v1/applications/token" + conn_params["proxies"] = conn.extra_dejson.get("proxies", None) return conn_params @@ -74,9 +76,15 @@ def create_api_session(self) -> AirbyteAPI: token_url=self.conn["token_url"], ) + client = None + if self.conn["proxies"]: + client = Session() + client.proxies = self.conn["proxies"] + return AirbyteAPI( server_url=self.conn["host"], security=Security(client_credentials=credentials), + client=client, ) @classmethod diff --git a/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py b/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py index f447600c78073..afbbee65f10c4 100644 --- a/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py +++ b/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py @@ -38,14 +38,19 @@ class TestAirbyteHook: Test all functions from Airbyte Hook """ + conn_type = "airbyte" airbyte_conn_id = "airbyte_conn_id_test" + airbyte_conn_id_with_proxy = "airbyte_conn_id_test_with_proxy" connection_id = "conn_test_sync" job_id = 1 + host = "http://test-airbyte:8000/public/v1/api/" + port = 8001 sync_connection_endpoint = "http://test-airbyte:8001/api/v1/connections/sync" get_job_endpoint = "http://test-airbyte:8001/api/v1/jobs/get" cancel_job_endpoint = "http://test-airbyte:8001/api/v1/jobs/cancel" health_endpoint = "http://test-airbyte:8001/api/v1/health" + _mock_proxy = {"proxies": {"http": "http://proxy:8080", "https": "https://proxy:8080"}} _mock_sync_conn_success_response_body = {"job": {"id": 1}} _mock_job_status_success_response_body = {"job": {"status": "succeeded"}} _mock_job_cancel_status = "cancelled" @@ -53,13 +58,23 @@ class TestAirbyteHook: def setup_method(self): db.merge_conn( Connection( - conn_id="airbyte_conn_id_test", - conn_type="airbyte", - host="http://test-airbyte:8000/public/v1/api/", - port=8001, + conn_id=self.airbyte_conn_id, + conn_type=self.conn_type, + host=self.host, + port=self.port, + ) + ) + db.merge_conn( + Connection( + conn_id=self.airbyte_conn_id_with_proxy, + conn_type=self.conn_type, + host=self.host, + port=self.port, + extra=self._mock_proxy, ) ) self.hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id) + self.hook_with_proxy = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id_with_proxy) def return_value_get_job(self, status): response = mock.Mock() @@ -202,3 +217,14 @@ def test_connection_failure(self, mock_get_health_check): status, msg = self.hook.test_connection() assert status is False assert msg == '{"message": "internal server error"}' + + def test_create_api_session_with_proxy(self): + """ + Test the creation of the API session with proxy settings. + """ + # Create a new AirbyteHook instance + hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id_with_proxy) + + # Check if the session is created correctly + assert hook.airbyte_api is not None + assert hook.airbyte_api.sdk_configuration.client.proxies == self._mock_proxy["proxies"]