Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions providers/microsoft/azure/docs/filesystems/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

Filesystems
===========

.. toctree::
:maxdepth: 1
:caption: Filesystem Providers
:glob:

*
187 changes: 187 additions & 0 deletions providers/microsoft/azure/docs/filesystems/msgraph.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

Microsoft Graph Filesystem
===========================

The Microsoft Graph filesystem provides access to OneDrive, SharePoint, and Teams document libraries through Airflow's ObjectStoragePath interface.

Supported URL formats:

* ``msgraph://connection_id/drive_id/path/to/file``
* ``sharepoint://connection_id/drive_id/path/to/file``
* ``onedrive://connection_id/drive_id/path/to/file``
* ``msgd://connection_id/drive_id/path/to/file``

Connection Configuration
------------------------

Create a Microsoft Graph connection in Airflow with the following parameters:

* **Connection Type**: msgraph
* **Host**: Tenant ID
* **Login**: Client ID
* **Password**: Client Secret

The connection form provides additional configuration fields:

* **Tenant ID**: Azure AD tenant identifier
* **Drive ID**: Specific drive to access (optional - leave empty for general access)
* **Scopes**: OAuth2 scopes (default: https://graph.microsoft.com/.default)

Additional OAuth2 parameters supported via connection extras:

* **scope**: OAuth2 access scope
* **token_endpoint**: Custom token endpoint URL
* **redirect_uri**: OAuth2 redirect URI for authorization code flow
* **token_endpoint_auth_method**: Client authentication method (default: client_secret_basic)
* **code_challenge_method**: PKCE code challenge method (e.g., 'S256')
* **username**: Username for password grant flow
* **password**: Password for password grant flow

Connection extra field configuration example:

.. code-block:: json

{
"drive_id": "b!abc123...",
"scope": "https://graph.microsoft.com/.default",
"token_endpoint": "https://login.microsoftonline.com/your-tenant/oauth2/v2.0/token",
"redirect_uri": "http://localhost:8080/callback",
"token_endpoint_auth_method": "client_secret_post"
}

Usage Examples
--------------

Reading Files
^^^^^^^^^^^^^

.. code-block:: python

from airflow.sdk.io.path import ObjectStoragePath

# Access a file in OneDrive
path = ObjectStoragePath("onedrive://my_conn/drive123/Documents/data.csv")

# Read file content
with path.open("r") as f:
content = f.read()

Directory Operations
^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

# List directory contents in SharePoint
sharepoint_path = ObjectStoragePath("sharepoint://sp_conn/site_drive/Shared Documents/")

for item in sharepoint_path.iterdir():
print(f"Found: {item.name}")
if item.is_file():
print(f" Size: {item.stat().st_size} bytes")

File Operations
^^^^^^^^^^^^^^^

.. code-block:: python

# Copy file between drives
source = ObjectStoragePath("msgraph://conn1/drive1/source.txt")
target = ObjectStoragePath("msgraph://conn2/drive2/backup/source.txt")
source.copy(target)

# Move file
old_path = ObjectStoragePath("onedrive://conn/drive/temp/file.txt")
new_path = ObjectStoragePath("onedrive://conn/drive/archive/file.txt")
old_path.move(new_path)

# Delete file
file_to_delete = ObjectStoragePath("msgraph://conn/drive/old_data.csv")
file_to_delete.unlink()

Writing Files
^^^^^^^^^^^^^

.. code-block:: python

# Write new file
output_path = ObjectStoragePath("sharepoint://sp_conn/docs/reports/report.txt")

with output_path.open("w") as f:
f.write("Generated report data\n")
f.write(f"Created at: {datetime.now()}\n")

Drive Discovery
^^^^^^^^^^^^^^^

When you need to find the correct drive ID for your URLs, you can use the Microsoft Graph API operators:

.. code-block:: python

from airflow.providers.microsoft.azure.operators.msgraph import MSGraphAsyncOperator

# List all drives for a user
list_drives = MSGraphAsyncOperator(
task_id="list_drives",
conn_id="msgraph_conn",
url="me/drives",
result_processor=lambda response: [
{"id": drive["id"], "name": drive["name"]} for drive in response["value"]
],
)

URL Scheme Mapping
------------------

The different URL schemes map to specific Microsoft Graph endpoints:

* ``msgraph://`` - General Microsoft Graph access
* ``onedrive://`` - OneDrive personal and business drives
* ``sharepoint://`` - SharePoint document libraries
* ``msgd://`` - Shortened form of msgraph://

All schemes use the same underlying Microsoft Graph API and authentication.

Requirements
------------

The Microsoft Graph filesystem requires:

* ``msgraphfs`` Python package
* Valid Azure AD application registration with appropriate permissions
* Microsoft Graph API access for your tenant

Required Microsoft Graph permissions:

* ``Files.Read`` - To read files
* ``Files.ReadWrite`` - To read and write files
* ``Sites.Read.All`` - To access SharePoint sites (if using ``sharepoint://`` URLs)

Cross-References
----------------

* :doc:`Microsoft Graph API Operators </operators/msgraph>` - For API operations and drive discovery

Reference
---------

For further information, look at:

* `Microsoft Graph Files API <https://learn.microsoft.com/en-us/graph/api/resources/onedrive>`__
* `msgraphfs Python package <https://pypi.org/project/msgraphfs/>`__
* `Use the Microsoft Graph API <https://learn.microsoft.com/en-us/graph/use-the-api/>`__
1 change: 1 addition & 0 deletions providers/microsoft/azure/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
Connection types <connections/index>
Operators <operators/index>
Transfers <transfer/index>
Filesystems <filesystems/index>
Secrets backends <secrets-backends/azure-key-vault>
Logging for Tasks <logging/index>
Sensors <sensors/index>
Expand Down
4 changes: 4 additions & 0 deletions providers/microsoft/azure/docs/operators/msgraph.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ Below is an example of using this operator to create an item schedule in Fabric.
:start-after: [START howto_operator_ms_fabric_create_item_schedule]
:end-before: [END howto_operator_ms_fabric_create_item_schedule]

Cross-References
----------------

* :doc:`Microsoft Graph Filesystem </filesystems/msgraph>` - For file operations using ObjectStoragePath

Reference
---------
Expand Down
1 change: 1 addition & 0 deletions providers/microsoft/azure/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ sensors:

filesystems:
- airflow.providers.microsoft.azure.fs.adls
- airflow.providers.microsoft.azure.fs.msgraphfs

hooks:
- integration-name: Microsoft Azure Container Instances
Expand Down
1 change: 1 addition & 0 deletions providers/microsoft/azure/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ dependencies = [
"azure-mgmt-containerregistry>=8.0.0",
"azure-mgmt-containerinstance>=10.1.0",
"msgraph-core>=1.3.3",
"msgraphfs>=0.3.0",
"microsoft-kiota-http>=1.9.4,<2.0.0",
"microsoft-kiota-serialization-json>=1.9.4",
"microsoft-kiota-serialization-text>=1.9.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from airflow.providers.microsoft.azure.utils import get_field
from airflow.providers.microsoft.azure.version_compat import BaseHook

if TYPE_CHECKING:
from fsspec import AbstractFileSystem

schemes = ["msgraph", "sharepoint", "onedrive", "msgd"]


def get_fs(conn_id: str | None, storage_options: dict[str, Any] | None = None) -> AbstractFileSystem:
from msgraphfs import MSGDriveFS

if conn_id is None:
return MSGDriveFS({})

conn = BaseHook.get_connection(conn_id)
extras = conn.extra_dejson
conn_type = conn.conn_type or "msgraph"

options: dict[str, Any] = {}

# Get authentication parameters with fallback handling
client_id = conn.login or get_field(
conn_id=conn_id, conn_type=conn_type, extras=extras, field_name="client_id"
)
client_secret = conn.password or get_field(
conn_id=conn_id, conn_type=conn_type, extras=extras, field_name="client_secret"
)
tenant_id = conn.host or get_field(
conn_id=conn_id, conn_type=conn_type, extras=extras, field_name="tenant_id"
)

if client_id:
options["client_id"] = client_id
if client_secret:
options["client_secret"] = client_secret
if tenant_id:
options["tenant_id"] = tenant_id

# Process additional fields from extras
fields = [
"drive_id",
"scope",
"token_endpoint",
"redirect_uri",
"token_endpoint_auth_method",
"code_challenge_method",
"update_token",
"username",
"password",
]
for field in fields:
value = get_field(conn_id=conn_id, conn_type=conn_type, extras=extras, field_name=field)
if value is not None:
if value == "":
options.pop(field, "")
else:
options[field] = value

# Update with storage options
options.update(storage_options or {})

# Create oauth2 client parameters if authentication is provided
oauth2_client_params = {}
if options.get("client_id") and options.get("client_secret") and options.get("tenant_id"):
oauth2_client_params = {
"client_id": options["client_id"],
"client_secret": options["client_secret"],
"tenant_id": options["tenant_id"],
}

# Add additional oauth2 parameters supported by authlib
oauth2_params = [
"scope",
"token_endpoint",
"redirect_uri",
"token_endpoint_auth_method",
"code_challenge_method",
"update_token",
"username",
"password",
]
for param in oauth2_params:
if param in options:
oauth2_client_params[param] = options[param]

# Determine which filesystem to return based on drive_id
drive_id = options.get("drive_id")

return MSGDriveFS(drive_id=drive_id, oauth2_client_params=oauth2_client_params)
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ def get_provider_info():
"python-modules": ["airflow.providers.microsoft.azure.sensors.msgraph"],
},
],
"filesystems": ["airflow.providers.microsoft.azure.fs.adls"],
"filesystems": [
"airflow.providers.microsoft.azure.fs.adls",
"airflow.providers.microsoft.azure.fs.msgraphfs",
],
"hooks": [
{
"integration-name": "Microsoft Azure Container Instances",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]:

return {
"tenant_id": StringField(lazy_gettext("Tenant ID"), widget=BS3TextFieldWidget()),
"drive_id": StringField(lazy_gettext("Drive ID"), widget=BS3TextFieldWidget()),
"api_version": StringField(
lazy_gettext("API Version"), widget=BS3TextFieldWidget(), default=APIVersion.v1.value
),
Expand Down
Loading