Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions providers/airbyte/docs/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,11 @@ Client ID (required)
Client Secret (required)
The Client Secret to connect to the Airbyte server.
You can find this information in the Settings / Applications page in Airbyte UI.

Extras (optional)
Specify custom proxies in json format.
Following default requests parameters are taken into account:

* ``proxies``

Example: ``{"http": "http://proxy.example.com:8080", "https": "http://proxy.example.com:8080"}``
1 change: 1 addition & 0 deletions providers/airbyte/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ requires-python = "~=3.9"
dependencies = [
"apache-airflow>=2.9.0",
"airbyte-api>=0.52.0",
"requests>=2.31.0",
]

[dependency-groups]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airbyte_api import AirbyteAPI
from airbyte_api.api import CancelJobRequest, GetJobRequest
from airbyte_api.models import JobCreateRequest, JobStatusEnum, JobTypeEnum, SchemeClientCredentials, Security
from requests import Session

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
Expand Down Expand Up @@ -63,6 +64,7 @@ def get_conn_params(self, conn_id: str) -> Any:
conn_params["client_id"] = conn.login
conn_params["client_secret"] = conn.password
conn_params["token_url"] = conn.schema or "v1/applications/token"
conn_params["proxies"] = conn.extra_dejson.get("proxies", None)

return conn_params

Expand All @@ -74,9 +76,15 @@ def create_api_session(self) -> AirbyteAPI:
token_url=self.conn["token_url"],
)

client = None
if self.conn["proxies"]:
client = Session()
client.proxies = self.conn["proxies"]

return AirbyteAPI(
server_url=self.conn["host"],
security=Security(client_credentials=credentials),
client=client,
)

@classmethod
Expand Down
34 changes: 30 additions & 4 deletions providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,43 @@ class TestAirbyteHook:
Test all functions from Airbyte Hook
"""

conn_type = "airbyte"
airbyte_conn_id = "airbyte_conn_id_test"
airbyte_conn_id_with_proxy = "airbyte_conn_id_test_with_proxy"
connection_id = "conn_test_sync"
job_id = 1
host = "http://test-airbyte:8000/public/v1/api/"
port = 8001
sync_connection_endpoint = "http://test-airbyte:8001/api/v1/connections/sync"
get_job_endpoint = "http://test-airbyte:8001/api/v1/jobs/get"
cancel_job_endpoint = "http://test-airbyte:8001/api/v1/jobs/cancel"

health_endpoint = "http://test-airbyte:8001/api/v1/health"
_mock_proxy = {"proxies": {"http": "http://proxy:8080", "https": "https://proxy:8080"}}
_mock_sync_conn_success_response_body = {"job": {"id": 1}}
_mock_job_status_success_response_body = {"job": {"status": "succeeded"}}
_mock_job_cancel_status = "cancelled"

def setup_method(self):
db.merge_conn(
Connection(
conn_id="airbyte_conn_id_test",
conn_type="airbyte",
host="http://test-airbyte:8000/public/v1/api/",
port=8001,
conn_id=self.airbyte_conn_id,
conn_type=self.conn_type,
host=self.host,
port=self.port,
)
)
db.merge_conn(
Connection(
conn_id=self.airbyte_conn_id_with_proxy,
conn_type=self.conn_type,
host=self.host,
port=self.port,
extra=self._mock_proxy,
)
)
self.hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id)
self.hook_with_proxy = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id_with_proxy)

def return_value_get_job(self, status):
response = mock.Mock()
Expand Down Expand Up @@ -202,3 +217,14 @@ def test_connection_failure(self, mock_get_health_check):
status, msg = self.hook.test_connection()
assert status is False
assert msg == '{"message": "internal server error"}'

def test_create_api_session_with_proxy(self):
"""
Test the creation of the API session with proxy settings.
"""
# Create a new AirbyteHook instance
hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id_with_proxy)

# Check if the session is created correctly
assert hook.airbyte_api is not None
assert hook.airbyte_api.sdk_configuration.client.proxies == self._mock_proxy["proxies"]