diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 33d52dfe75a84..b718e36e7235e 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -1403,6 +1403,8 @@ api: description: | Paths to the SSL certificate and key for the api server. When both are provided SSL will be enabled. This does not change the api server port. + The same SSL certificate will also be loaded into the worker to enable + it to be trusted when a self-signed certificate is used. version_added: ~ type: string example: ~ diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py index 36489d3d79d58..3413e98189cfc 100644 --- a/task-sdk/src/airflow/sdk/api/client.py +++ b/task-sdk/src/airflow/sdk/api/client.py @@ -18,11 +18,13 @@ from __future__ import annotations import logging +import ssl import sys import uuid from http import HTTPStatus from typing import TYPE_CHECKING, Any, TypeVar +import certifi import httpx import msgspec import structlog @@ -660,6 +662,7 @@ def noop_handler(request: httpx.Request) -> httpx.Response: API_RETRIES = conf.getint("workers", "execution_api_retries") API_RETRY_WAIT_MIN = conf.getfloat("workers", "execution_api_retry_wait_min") API_RETRY_WAIT_MAX = conf.getfloat("workers", "execution_api_retry_wait_max") +API_SSL_CERT_PATH = conf.get("api", "ssl_cert") class Client(httpx.Client): @@ -675,6 +678,10 @@ def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, * kwargs.setdefault("base_url", "dry-run://server") else: kwargs["base_url"] = base_url + ctx = ssl.create_default_context(cafile=certifi.where()) + if API_SSL_CERT_PATH: + ctx.load_verify_locations(API_SSL_CERT_PATH) + kwargs["verify"] = ctx pyver = f"{'.'.join(map(str, sys.version_info[:3]))}" super().__init__( auth=auth, diff --git a/task-sdk/tests/task_sdk/api/test_client.py b/task-sdk/tests/task_sdk/api/test_client.py index 7fa17ba1c4d38..caa515de09a4d 100644 --- a/task-sdk/tests/task_sdk/api/test_client.py +++ b/task-sdk/tests/task_sdk/api/test_client.py @@ -79,6 +79,16 @@ def test_dry_run(self, path, json_response): assert resp.status_code == 200 assert resp.json() == json_response + @mock.patch("airflow.sdk.api.client.API_SSL_CERT_PATH", "/capath/does/not/exist/") + def test_add_capath(self): + def handle_request(request: httpx.Request) -> httpx.Response: + return httpx.Response(status_code=200) + + with pytest.raises(FileNotFoundError) as err: + make_client(httpx.MockTransport(handle_request)) + + assert isinstance(err.value, FileNotFoundError) + def test_error_parsing(self): responses = [ httpx.Response(422, json={"detail": [{"loc": ["#0"], "msg": "err", "type": "required"}]})