diff --git a/providers/microsoft/azure/docs/connections/wasb.rst b/providers/microsoft/azure/docs/connections/wasb.rst index e7e2f78b596ad..057160b30f0e9 100644 --- a/providers/microsoft/azure/docs/connections/wasb.rst +++ b/providers/microsoft/azure/docs/connections/wasb.rst @@ -27,7 +27,7 @@ The Microsoft Azure Blob Storage connection type enables the Azure Blob Storage Authenticating to Azure Blob Storage ------------------------------------ -There are six ways to connect to Azure Blob Storage using Airflow. +There are seven ways to connect to Azure Blob Storage using Airflow. 1. Use `token credentials`_ i.e. add specific credentials (client_id, secret, tenant) and subscription id to the Airflow connection. @@ -37,8 +37,9 @@ There are six ways to connect to Azure Blob Storage using Airflow. i.e. add a key config to ``sas_token`` in the Airflow connection. 4. Use a `Connection String`_ i.e. add connection string to ``connection_string`` in the Airflow connection. -5. Use managed identity by setting ``managed_identity_client_id``, ``workload_identity_tenant_id`` (under the hook, it uses DefaultAzureCredential_ with these arguments) -6. Fallback on DefaultAzureCredential_. +5. Use account key by setting ``account_key`` in the Airflow connection extra fields. +6. Use managed identity by setting ``managed_identity_client_id``, ``workload_identity_tenant_id`` (under the hook, it uses DefaultAzureCredential_ with these arguments) +7. Fallback on DefaultAzureCredential_. This includes a mechanism to try different options to authenticate: Managed System Identity, environment variables, authentication through Azure CLI, etc. Only one authorization method can be used at a time. If you need to manage multiple credentials or keys then you should @@ -84,6 +85,7 @@ Extra (optional) Specify the extra parameters (as json dictionary) that can be used in Azure connection. The following parameters are all optional: + * ``account_key``: Specify the account key for Azure Blob Storage authentication. This will be checked before falling back to DefaultAzureCredential_. * ``client_secret_auth_config``: Extra config to pass while authenticating as a service principal using `ClientSecretCredential`_ It can be left out to fall back on DefaultAzureCredential_. * ``managed_identity_client_id``: The client ID of a user-assigned managed identity. If provided with `workload_identity_tenant_id`, they'll pass to ``DefaultAzureCredential``. * ``workload_identity_tenant_id``: ID of the application's Microsoft Entra tenant. Also called its "directory" ID. If provided with `managed_identity_client_id`, they'll pass to ``DefaultAzureCredential``. diff --git a/providers/microsoft/azure/docs/logging/index.rst b/providers/microsoft/azure/docs/logging/index.rst index 16ed9b5c7fd07..04405eedf35b1 100644 --- a/providers/microsoft/azure/docs/logging/index.rst +++ b/providers/microsoft/azure/docs/logging/index.rst @@ -58,7 +58,7 @@ Setup Steps: '''''''''''''' #. Install the provider package with ``pip install apache-airflow-providers-microsoft-azure``. -#. Ensure :ref:`connection ` is already setup with read and write access to Azure Blob Storage in the ``remote_wasb_log_container`` container and path ``remote_base_log_folder``. +#. Ensure :ref:`connection ` is already setup with read and write access to Azure Blob Storage in the ``remote_wasb_log_container`` container and path ``remote_base_log_folder``. The connection should be configured with appropriate authentication credentials (such as account key, shared access key, or managed identity). For account key authentication, you can add ``account_key`` to the connection's extra fields as a JSON dictionary: ``{"account_key": "your_account_key"}``. #. Setup the above configuration values. Please note that the container should already exist. #. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution. #. Verify that logs are showing up for newly executed tasks in the container at the specified base path you have defined. diff --git a/providers/microsoft/azure/newsfragments/51944.bugfix.rst b/providers/microsoft/azure/newsfragments/51944.bugfix.rst new file mode 100644 index 0000000000000..5706bcc088e17 --- /dev/null +++ b/providers/microsoft/azure/newsfragments/51944.bugfix.rst @@ -0,0 +1 @@ +Fix Azure Blob Storage authentication to check ``account_key`` field in connection extra before falling back to ``DefaultAzureCredential`` diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/wasb.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/wasb.py index 8eb009592a15f..7ff704e0f5e45 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/wasb.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/wasb.py @@ -67,8 +67,9 @@ class WasbHook(BaseHook): These parameters have to be passed in Airflow Data Base: account_name and account_key. Additional options passed in the 'extra' field of the connection will be - passed to the `BlockBlockService()` constructor. For example, authenticate - using a SAS token by adding {"sas_token": "YOUR_TOKEN"}. + passed to the `BlobServiceClient()` constructor. For example, authenticate + using a SAS token by adding {"sas_token": "YOUR_TOKEN"} or using an account key + by adding {"account_key": "YOUR_ACCOUNT_KEY"}. If no authentication configuration is provided, DefaultAzureCredential will be used (applicable when using Azure compute infrastructure). @@ -121,7 +122,7 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]: "tenant_id": "tenant", "shared_access_key": "shared access key", "sas_token": "account url or token", - "extra": "additional options for use with ClientSecretCredential or DefaultAzureCredential", + "extra": "additional options for use with ClientSecretCredential, DefaultAzureCredential, or account_key authentication", }, } @@ -198,13 +199,18 @@ def get_conn(self) -> BlobServiceClient: # Fall back to old auth (password) or use managed identity if not provided. credential = conn.password if not credential: - managed_identity_client_id = self._get_field(extra, "managed_identity_client_id") - workload_identity_tenant_id = self._get_field(extra, "workload_identity_tenant_id") - credential = get_sync_default_azure_credential( - managed_identity_client_id=managed_identity_client_id, - workload_identity_tenant_id=workload_identity_tenant_id, - ) - self.log.info("Using DefaultAzureCredential as credential") + # Check for account_key in extra fields before falling back to DefaultAzureCredential + account_key = self._get_field(extra, "account_key") + if account_key: + credential = account_key + else: + managed_identity_client_id = self._get_field(extra, "managed_identity_client_id") + workload_identity_tenant_id = self._get_field(extra, "workload_identity_tenant_id") + credential = get_sync_default_azure_credential( + managed_identity_client_id=managed_identity_client_id, + workload_identity_tenant_id=workload_identity_tenant_id, + ) + self.log.info("Using DefaultAzureCredential as credential") return BlobServiceClient( account_url=account_url, credential=credential, @@ -646,13 +652,18 @@ async def get_async_conn(self) -> AsyncBlobServiceClient: # Fall back to old auth (password) or use managed identity if not provided. credential = conn.password if not credential: - managed_identity_client_id = self._get_field(extra, "managed_identity_client_id") - workload_identity_tenant_id = self._get_field(extra, "workload_identity_tenant_id") - credential = get_async_default_azure_credential( - managed_identity_client_id=managed_identity_client_id, - workload_identity_tenant_id=workload_identity_tenant_id, - ) - self.log.info("Using DefaultAzureCredential as credential") + # Check for account_key in extra fields before falling back to DefaultAzureCredential + account_key = self._get_field(extra, "account_key") + if account_key: + credential = account_key + else: + managed_identity_client_id = self._get_field(extra, "managed_identity_client_id") + workload_identity_tenant_id = self._get_field(extra, "workload_identity_tenant_id") + credential = get_async_default_azure_credential( + managed_identity_client_id=managed_identity_client_id, + workload_identity_tenant_id=workload_identity_tenant_id, + ) + self.log.info("Using DefaultAzureCredential as credential") self.blob_service_client = AsyncBlobServiceClient( account_url=account_url, credential=credential, diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_wasb.py b/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_wasb.py index 55895879fac53..dffb2eeb4bb2d 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_wasb.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/hooks/test_wasb.py @@ -79,6 +79,7 @@ def setup_method(self, create_mock_connections): self.public_read_conn_id = "pub_read_id" self.public_read_conn_id_without_host = "pub_read_id_without_host" self.managed_identity_conn_id = "managed_identity_conn_id" + self.account_key_conn_id = "account_key_conn_id" self.authority = "https://test_authority.com" self.proxies = PROXIES @@ -135,6 +136,12 @@ def setup_method(self, create_mock_connections): conn_type=self.connection_type, extra={"proxies": self.proxies}, ), + Connection( + conn_id=self.account_key_conn_id, + conn_type=self.connection_type, + login="testaccount", + extra={"account_key": "test_account_key", "proxies": self.proxies}, + ), Connection( conn_id="sas_conn_id", conn_type=self.connection_type, @@ -223,6 +230,16 @@ def test_azure_directory_connection(self, mocked_client_secret_credential, mocke proxies=self.proxies, ) + def test_account_key_connection(self, mocked_blob_service_client): + """Test that account_key from extra is used when no password is provided.""" + WasbHook(wasb_conn_id=self.account_key_conn_id).get_conn() + mocked_blob_service_client.assert_called_once_with( + account_url="https://testaccount.blob.core.windows.net/", + credential="test_account_key", + proxies=self.proxies, + account_key="test_account_key", + ) + @pytest.mark.parametrize( "mocked_connection", [ @@ -331,6 +348,7 @@ def test_sas_token_connection(self, conn_id_str, extra_key): "azure_shared_key_test", "ad_conn_id", "managed_identity_conn_id", + "account_key_conn_id", "sas_conn_id", "extra__wasb__sas_conn_id", "http_sas_conn_id", @@ -659,6 +677,7 @@ def test_connection_failure(self, mocked_blob_service_client): "azure_shared_key_test", "ad_conn_id", "managed_identity_conn_id", + "account_key_conn_id", "sas_conn_id", "extra__wasb__sas_conn_id", "http_sas_conn_id",