diff --git a/providers/microsoft/azure/docs/filesystems/index.rst b/providers/microsoft/azure/docs/filesystems/index.rst new file mode 100644 index 0000000000000..eb0036177a3fa --- /dev/null +++ b/providers/microsoft/azure/docs/filesystems/index.rst @@ -0,0 +1,26 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Filesystems +=========== + +.. toctree:: + :maxdepth: 1 + :caption: Filesystem Providers + :glob: + + * diff --git a/providers/microsoft/azure/docs/filesystems/msgraph.rst b/providers/microsoft/azure/docs/filesystems/msgraph.rst new file mode 100644 index 0000000000000..7ebea783b7e80 --- /dev/null +++ b/providers/microsoft/azure/docs/filesystems/msgraph.rst @@ -0,0 +1,187 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Microsoft Graph Filesystem +=========================== + +The Microsoft Graph filesystem provides access to OneDrive, SharePoint, and Teams document libraries through Airflow's ObjectStoragePath interface. + +Supported URL formats: + +* ``msgraph://connection_id/drive_id/path/to/file`` +* ``sharepoint://connection_id/drive_id/path/to/file`` +* ``onedrive://connection_id/drive_id/path/to/file`` +* ``msgd://connection_id/drive_id/path/to/file`` + +Connection Configuration +------------------------ + +Create a Microsoft Graph connection in Airflow with the following parameters: + +* **Connection Type**: msgraph +* **Host**: Tenant ID +* **Login**: Client ID +* **Password**: Client Secret + +The connection form provides additional configuration fields: + +* **Tenant ID**: Azure AD tenant identifier +* **Drive ID**: Specific drive to access (optional - leave empty for general access) +* **Scopes**: OAuth2 scopes (default: https://graph.microsoft.com/.default) + +Additional OAuth2 parameters supported via connection extras: + +* **scope**: OAuth2 access scope +* **token_endpoint**: Custom token endpoint URL +* **redirect_uri**: OAuth2 redirect URI for authorization code flow +* **token_endpoint_auth_method**: Client authentication method (default: client_secret_basic) +* **code_challenge_method**: PKCE code challenge method (e.g., 'S256') +* **username**: Username for password grant flow +* **password**: Password for password grant flow + +Connection extra field configuration example: + +.. code-block:: json + + { + "drive_id": "b!abc123...", + "scope": "https://graph.microsoft.com/.default", + "token_endpoint": "https://login.microsoftonline.com/your-tenant/oauth2/v2.0/token", + "redirect_uri": "http://localhost:8080/callback", + "token_endpoint_auth_method": "client_secret_post" + } + +Usage Examples +-------------- + +Reading Files +^^^^^^^^^^^^^ + +.. code-block:: python + + from airflow.sdk.io.path import ObjectStoragePath + + # Access a file in OneDrive + path = ObjectStoragePath("onedrive://my_conn/drive123/Documents/data.csv") + + # Read file content + with path.open("r") as f: + content = f.read() + +Directory Operations +^^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + # List directory contents in SharePoint + sharepoint_path = ObjectStoragePath("sharepoint://sp_conn/site_drive/Shared Documents/") + + for item in sharepoint_path.iterdir(): + print(f"Found: {item.name}") + if item.is_file(): + print(f" Size: {item.stat().st_size} bytes") + +File Operations +^^^^^^^^^^^^^^^ + +.. code-block:: python + + # Copy file between drives + source = ObjectStoragePath("msgraph://conn1/drive1/source.txt") + target = ObjectStoragePath("msgraph://conn2/drive2/backup/source.txt") + source.copy(target) + + # Move file + old_path = ObjectStoragePath("onedrive://conn/drive/temp/file.txt") + new_path = ObjectStoragePath("onedrive://conn/drive/archive/file.txt") + old_path.move(new_path) + + # Delete file + file_to_delete = ObjectStoragePath("msgraph://conn/drive/old_data.csv") + file_to_delete.unlink() + +Writing Files +^^^^^^^^^^^^^ + +.. code-block:: python + + # Write new file + output_path = ObjectStoragePath("sharepoint://sp_conn/docs/reports/report.txt") + + with output_path.open("w") as f: + f.write("Generated report data\n") + f.write(f"Created at: {datetime.now()}\n") + +Drive Discovery +^^^^^^^^^^^^^^^ + +When you need to find the correct drive ID for your URLs, you can use the Microsoft Graph API operators: + +.. code-block:: python + + from airflow.providers.microsoft.azure.operators.msgraph import MSGraphAsyncOperator + + # List all drives for a user + list_drives = MSGraphAsyncOperator( + task_id="list_drives", + conn_id="msgraph_conn", + url="me/drives", + result_processor=lambda response: [ + {"id": drive["id"], "name": drive["name"]} for drive in response["value"] + ], + ) + +URL Scheme Mapping +------------------ + +The different URL schemes map to specific Microsoft Graph endpoints: + +* ``msgraph://`` - General Microsoft Graph access +* ``onedrive://`` - OneDrive personal and business drives +* ``sharepoint://`` - SharePoint document libraries +* ``msgd://`` - Shortened form of msgraph:// + +All schemes use the same underlying Microsoft Graph API and authentication. + +Requirements +------------ + +The Microsoft Graph filesystem requires: + +* ``msgraphfs`` Python package +* Valid Azure AD application registration with appropriate permissions +* Microsoft Graph API access for your tenant + +Required Microsoft Graph permissions: + +* ``Files.Read`` - To read files +* ``Files.ReadWrite`` - To read and write files +* ``Sites.Read.All`` - To access SharePoint sites (if using ``sharepoint://`` URLs) + +Cross-References +---------------- + +* :doc:`Microsoft Graph API Operators ` - For API operations and drive discovery + +Reference +--------- + +For further information, look at: + +* `Microsoft Graph Files API `__ +* `msgraphfs Python package `__ +* `Use the Microsoft Graph API `__ diff --git a/providers/microsoft/azure/docs/index.rst b/providers/microsoft/azure/docs/index.rst index 18f61a50e8e2b..e551f109d1962 100644 --- a/providers/microsoft/azure/docs/index.rst +++ b/providers/microsoft/azure/docs/index.rst @@ -37,6 +37,7 @@ Connection types Operators Transfers + Filesystems Secrets backends Logging for Tasks Sensors diff --git a/providers/microsoft/azure/docs/operators/msgraph.rst b/providers/microsoft/azure/docs/operators/msgraph.rst index 52f7ff11f52ae..99815a959adeb 100644 --- a/providers/microsoft/azure/docs/operators/msgraph.rst +++ b/providers/microsoft/azure/docs/operators/msgraph.rst @@ -80,6 +80,10 @@ Below is an example of using this operator to create an item schedule in Fabric. :start-after: [START howto_operator_ms_fabric_create_item_schedule] :end-before: [END howto_operator_ms_fabric_create_item_schedule] +Cross-References +---------------- + +* :doc:`Microsoft Graph Filesystem ` - For file operations using ObjectStoragePath Reference --------- diff --git a/providers/microsoft/azure/provider.yaml b/providers/microsoft/azure/provider.yaml index e67c5293075ac..9c6b110ae9b81 100644 --- a/providers/microsoft/azure/provider.yaml +++ b/providers/microsoft/azure/provider.yaml @@ -225,6 +225,7 @@ sensors: filesystems: - airflow.providers.microsoft.azure.fs.adls + - airflow.providers.microsoft.azure.fs.msgraphfs hooks: - integration-name: Microsoft Azure Container Instances diff --git a/providers/microsoft/azure/pyproject.toml b/providers/microsoft/azure/pyproject.toml index 3bbd1d7790c51..bb876631911b0 100644 --- a/providers/microsoft/azure/pyproject.toml +++ b/providers/microsoft/azure/pyproject.toml @@ -81,6 +81,7 @@ dependencies = [ "azure-mgmt-containerregistry>=8.0.0", "azure-mgmt-containerinstance>=10.1.0", "msgraph-core>=1.3.3", + "msgraphfs>=0.3.0", "microsoft-kiota-http>=1.9.4,<2.0.0", "microsoft-kiota-serialization-json>=1.9.4", "microsoft-kiota-serialization-text>=1.9.4", diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/fs/msgraph.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/fs/msgraph.py new file mode 100644 index 0000000000000..990b74cf0d9a8 --- /dev/null +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/fs/msgraph.py @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from airflow.providers.microsoft.azure.utils import get_field +from airflow.providers.microsoft.azure.version_compat import BaseHook + +if TYPE_CHECKING: + from fsspec import AbstractFileSystem + +schemes = ["msgraph", "sharepoint", "onedrive", "msgd"] + + +def get_fs(conn_id: str | None, storage_options: dict[str, Any] | None = None) -> AbstractFileSystem: + from msgraphfs import MSGDriveFS + + if conn_id is None: + return MSGDriveFS({}) + + conn = BaseHook.get_connection(conn_id) + extras = conn.extra_dejson + conn_type = conn.conn_type or "msgraph" + + options: dict[str, Any] = {} + + # Get authentication parameters with fallback handling + client_id = conn.login or get_field( + conn_id=conn_id, conn_type=conn_type, extras=extras, field_name="client_id" + ) + client_secret = conn.password or get_field( + conn_id=conn_id, conn_type=conn_type, extras=extras, field_name="client_secret" + ) + tenant_id = conn.host or get_field( + conn_id=conn_id, conn_type=conn_type, extras=extras, field_name="tenant_id" + ) + + if client_id: + options["client_id"] = client_id + if client_secret: + options["client_secret"] = client_secret + if tenant_id: + options["tenant_id"] = tenant_id + + # Process additional fields from extras + fields = [ + "drive_id", + "scope", + "token_endpoint", + "redirect_uri", + "token_endpoint_auth_method", + "code_challenge_method", + "update_token", + "username", + "password", + ] + for field in fields: + value = get_field(conn_id=conn_id, conn_type=conn_type, extras=extras, field_name=field) + if value is not None: + if value == "": + options.pop(field, "") + else: + options[field] = value + + # Update with storage options + options.update(storage_options or {}) + + # Create oauth2 client parameters if authentication is provided + oauth2_client_params = {} + if options.get("client_id") and options.get("client_secret") and options.get("tenant_id"): + oauth2_client_params = { + "client_id": options["client_id"], + "client_secret": options["client_secret"], + "tenant_id": options["tenant_id"], + } + + # Add additional oauth2 parameters supported by authlib + oauth2_params = [ + "scope", + "token_endpoint", + "redirect_uri", + "token_endpoint_auth_method", + "code_challenge_method", + "update_token", + "username", + "password", + ] + for param in oauth2_params: + if param in options: + oauth2_client_params[param] = options[param] + + # Determine which filesystem to return based on drive_id + drive_id = options.get("drive_id") + + return MSGDriveFS(drive_id=drive_id, oauth2_client_params=oauth2_client_params) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py index 58337ecdf8975..395622f8323f5 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/get_provider_info.py @@ -191,7 +191,10 @@ def get_provider_info(): "python-modules": ["airflow.providers.microsoft.azure.sensors.msgraph"], }, ], - "filesystems": ["airflow.providers.microsoft.azure.fs.adls"], + "filesystems": [ + "airflow.providers.microsoft.azure.fs.adls", + "airflow.providers.microsoft.azure.fs.msgraphfs", + ], "hooks": [ { "integration-name": "Microsoft Azure Container Instances", diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/msgraph.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/msgraph.py index 958a3b8f437e7..5435fd5593a6e 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/msgraph.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/msgraph.py @@ -152,6 +152,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: return { "tenant_id": StringField(lazy_gettext("Tenant ID"), widget=BS3TextFieldWidget()), + "drive_id": StringField(lazy_gettext("Drive ID"), widget=BS3TextFieldWidget()), "api_version": StringField( lazy_gettext("API Version"), widget=BS3TextFieldWidget(), default=APIVersion.v1.value ), diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/fs/test_msgraph.py b/providers/microsoft/azure/tests/unit/microsoft/azure/fs/test_msgraph.py new file mode 100644 index 0000000000000..235219b6dcb30 --- /dev/null +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/fs/test_msgraph.py @@ -0,0 +1,155 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from airflow.models.connection import Connection +from airflow.providers.microsoft.azure.fs.msgraph import get_fs + + +@pytest.fixture +def mock_connection(): + return Connection( + conn_id="msgraph_default", + conn_type="msgraph", + login="test_client_id", + password="test_client_secret", + host="test_tenant_id", + extra={"drive_id": "test_drive_id"}, + ) + + +@pytest.fixture +def mock_connection_minimal(): + return Connection( + conn_id="msgraph_minimal", + conn_type="msgraph", + login="test_client_id", + password="test_client_secret", + host="test_tenant_id", + ) + + +class TestMSGraphFS: + @patch("airflow.providers.microsoft.azure.fs.msgraph.BaseHook.get_connection") + @patch("msgraphfs.MSGDriveFS") + def test_get_fs_with_drive_id(self, mock_msgdrivefs, mock_get_connection, mock_connection): + mock_get_connection.return_value = mock_connection + mock_fs_instance = MagicMock() + mock_msgdrivefs.return_value = mock_fs_instance + + result = get_fs("msgraph_default") + + mock_msgdrivefs.assert_called_once_with( + drive_id="test_drive_id", + oauth2_client_params={ + "client_id": "test_client_id", + "client_secret": "test_client_secret", + "tenant_id": "test_tenant_id", + }, + ) + assert result == mock_fs_instance + + @patch("msgraphfs.MSGDriveFS") + def test_get_fs_no_connection(self, mock_msgdrivefs): + mock_fs_instance = MagicMock() + mock_msgdrivefs.return_value = mock_fs_instance + + result = get_fs(None) + + mock_msgdrivefs.assert_called_once_with({}) + assert result == mock_fs_instance + + @patch("airflow.providers.microsoft.azure.fs.msgraph.BaseHook.get_connection") + @patch("msgraphfs.MSGDriveFS") + def test_get_fs_with_extra_oauth_params(self, mock_msgdrivefs, mock_get_connection): + connection = Connection( + conn_id="msgraph_extra", + conn_type="msgraph", + login="test_client_id", + password="test_client_secret", + host="test_tenant_id", + extra={ + "drive_id": "test_drive_id", + "scope": "https://graph.microsoft.com/.default", + "token_endpoint": "https://login.microsoftonline.com/test/oauth2/v2.0/token", + "redirect_uri": "http://localhost:8080/callback", + }, + ) + mock_get_connection.return_value = connection + mock_fs_instance = MagicMock() + mock_msgdrivefs.return_value = mock_fs_instance + + result = get_fs("msgraph_extra") + + expected_oauth2_params = { + "client_id": "test_client_id", + "client_secret": "test_client_secret", + "tenant_id": "test_tenant_id", + "scope": "https://graph.microsoft.com/.default", + "token_endpoint": "https://login.microsoftonline.com/test/oauth2/v2.0/token", + "redirect_uri": "http://localhost:8080/callback", + } + mock_msgdrivefs.assert_called_once_with( + drive_id="test_drive_id", oauth2_client_params=expected_oauth2_params + ) + assert result == mock_fs_instance + + @patch("airflow.providers.microsoft.azure.fs.msgraph.BaseHook.get_connection") + @patch("msgraphfs.MSGDriveFS") + def test_get_fs_with_storage_options(self, mock_msgdrivefs, mock_get_connection, mock_connection_minimal): + mock_get_connection.return_value = mock_connection_minimal + mock_fs_instance = MagicMock() + mock_msgdrivefs.return_value = mock_fs_instance + + storage_options = {"drive_id": "storage_drive_id", "scope": "custom.scope"} + result = get_fs("msgraph_minimal", storage_options=storage_options) + + expected_oauth2_params = { + "client_id": "test_client_id", + "client_secret": "test_client_secret", + "tenant_id": "test_tenant_id", + "scope": "custom.scope", + } + mock_msgdrivefs.assert_called_once_with( + drive_id="storage_drive_id", oauth2_client_params=expected_oauth2_params + ) + assert result == mock_fs_instance + + @patch("airflow.providers.microsoft.azure.fs.msgraph.BaseHook.get_connection") + @patch("msgraphfs.MSGDriveFS") + def test_get_fs_incomplete_credentials(self, mock_msgdrivefs, mock_get_connection): + # Connection with missing client_secret + connection = Connection( + conn_id="msgraph_incomplete", + conn_type="msgraph", + login="test_client_id", + host="test_tenant_id", + ) + mock_get_connection.return_value = connection + mock_fs_instance = MagicMock() + mock_msgdrivefs.return_value = mock_fs_instance + + result = get_fs("msgraph_incomplete") + + # Should return default filesystem when credentials are incomplete + mock_msgdrivefs.assert_called_once_with(drive_id=None, oauth2_client_params={}) + assert result == mock_fs_instance