Skip to content

Commit 94f3589

Browse files
[Data] Cherry-pick: Add enhanced support for Unity Catalog (#57954) (#58049)
## Description This PR adds support for reading Unity Catalog Delta tables in Ray Data with automatic credential vending. This enables secure, temporary access to Delta Lake tables stored in Databricks Unity Catalog without requiring users to manage cloud credentials manually. ### What's Added - **`ray.data.read_unity_catalog()`** - Updated public API for reading Unity Catalog Delta tables - **`UnityCatalogConnector`** - Handles Unity Catalog REST API integration and credential vending - **Multi-cloud support** - Works with AWS S3, Azure Data Lake Storage, and Google Cloud Storage - **Automatic credential management** - Obtains temporary, least-privilege credentials via Unity Catalog API - **Delta Lake integration** - Properly configures PyArrow filesystem for Delta tables with session tokens ### Key Features ✅ **Production-ready credential vending API** - Uses stable, public Unity Catalog APIs ✅ **Secure by default** - Temporary credentials with automatic cleanup ✅ **Multi-cloud** - AWS (S3), Azure (Blob Storage), and GCP (Cloud Storage) ✅ **Delta Lake optimized** - Handles session tokens and PyArrow filesystem configuration ✅ **Comprehensive error handling** - Helpful messages for common issues (deletion vectors, permissions, etc.) ✅ **Full logging support** - Debug and info logging throughout ### Usage Example ```python import ray # Read a Unity Catalog Delta table ds = ray.data.read_unity_catalog( table="main.sales.transactions", url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", token="dapi...", region="us-west-2" # Optional, for AWS ) # Use standard Ray Data operations ds = ds.filter(lambda row: row["amount"] > 100) ds.show(5) ``` ### Implementation Notes This is a **simplified, focused implementation** that: - Supports **Unity Catalog tables only** (no volumes - that's in private preview) - Assumes **Delta Lake format** (most common Unity Catalog use case) - Uses **production-ready APIs** only (no private preview features) - Provides ~600 lines of clean, reviewable code The full implementation with volumes and multi-format support is available in the `data_uc_volumes` branch and can be added in a future PR once this foundation is reviewed. ### Testing - ✅ All ruff lint checks pass - ✅ Code formatted per Ray standards - ✅ Tested with real Unity Catalog Delta tables on AWS S3 - ✅ Proper PyArrow filesystem configuration verified - ✅ Credential vending flow validated ## Related issues Related to Unity Catalog and Delta Lake support requests in Ray Data. ## Additional information ### Architecture The implementation follows the **connector pattern** rather than a `Datasource` subclass because Unity Catalog is a metadata/credential layer, not a data format. The connector: 1. Fetches table metadata from Unity Catalog REST API 2. Obtains temporary credentials via credential vending API 3. Configures cloud-specific environment variables 4. Delegates to `ray.data.read_delta()` with proper filesystem configuration ### Delta Lake Special Handling Delta Lake on AWS requires explicit PyArrow S3FileSystem configuration with session tokens (environment variables alone are insufficient). This implementation correctly creates and passes the filesystem object to the `deltalake` library. ### Cloud Provider Support | Provider | Credential Type | Implementation | |----------|----------------|----------------| | AWS S3 | Temporary IAM credentials | PyArrow S3FileSystem with session token | | Azure Blob | SAS tokens | Environment variables (AZURE_STORAGE_SAS_TOKEN) | | GCP Cloud Storage | OAuth tokens / Service account | Environment variables (GCP_OAUTH_TOKEN, GOOGLE_APPLICATION_CREDENTIALS) | ### Error Handling Comprehensive error messages for common issues: - **Deletion Vectors**: Guidance on upgrading deltalake library or disabling the feature - **Column Mapping**: Compatibility information and solutions - **Permissions**: Clear list of required Unity Catalog permissions - **Credential issues**: Detailed troubleshooting steps ### Future Enhancements Potential follow-up PRs: - Unity Catalog volumes support (when out of private preview) - Multi-format support (Parquet, CSV, JSON, images, etc.) - Custom datasource integration - Advanced Delta Lake features (time travel, partition filters) ### Dependencies - Requires `deltalake` package for Delta Lake support - Uses standard Ray Data APIs (`read_delta`, `read_datasource`) - Integrates with existing PyArrow filesystem infrastructure ### Documentation - Full docstrings with examples - Type hints throughout - Inline comments with references to external documentation - Comprehensive error messages with actionable guidance --------- > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. > ⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ## Related issues > Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com> Co-authored-by: soffer-anyscale <173827098+soffer-anyscale@users.noreply.github.com>
1 parent 0e6b21a commit 94f3589

File tree

3 files changed

+95
-303
lines changed

3 files changed

+95
-303
lines changed

python/ray/data/_internal/datasource/uc_datasource.py

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import atexit
12
import os
23
import tempfile
34
from typing import Any, Callable, Dict, Optional
@@ -42,6 +43,7 @@ def __init__(
4243
self.operation = operation
4344
self.ray_init_kwargs = ray_init_kwargs or {}
4445
self.reader_kwargs = reader_kwargs or {}
46+
self._gcp_temp_file = None
4547

4648
def _get_table_info(self) -> dict:
4749
url = f"{self.base_url}/api/2.1/unity-catalog/tables/{self.table_full_name}"
@@ -81,12 +83,17 @@ def _set_env(self):
8183
env_vars["AZURE_STORAGE_SAS_TOKEN"] = creds["azuresasuri"]
8284
elif "gcp_service_account" in creds:
8385
gcp_json = creds["gcp_service_account"]
84-
with tempfile.NamedTemporaryFile(
85-
prefix="gcp_sa_", suffix=".json", delete=True
86-
) as temp_file:
87-
temp_file.write(gcp_json.encode())
88-
temp_file.flush()
89-
env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name
86+
temp_file = tempfile.NamedTemporaryFile(
87+
mode="w",
88+
prefix="gcp_sa_",
89+
suffix=".json",
90+
delete=False,
91+
)
92+
temp_file.write(gcp_json)
93+
temp_file.close()
94+
env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name
95+
self._gcp_temp_file = temp_file.name
96+
atexit.register(self._cleanup_gcp_temp_file, temp_file.name)
9097
else:
9198
raise ValueError(
9299
"No known credential type found in Databricks UC response."
@@ -96,6 +103,15 @@ def _set_env(self):
96103
os.environ[k] = v
97104
self._runtime_env = {"env_vars": env_vars}
98105

106+
@staticmethod
107+
def _cleanup_gcp_temp_file(temp_file_path: str):
108+
"""Clean up temporary GCP service account file."""
109+
if temp_file_path and os.path.exists(temp_file_path):
110+
try:
111+
os.unlink(temp_file_path)
112+
except OSError:
113+
pass
114+
99115
def _infer_data_format(self) -> str:
100116
if self.data_format:
101117
return self.data_format
@@ -121,17 +137,59 @@ def _get_ray_reader(self, data_format: str) -> Callable[..., Any]:
121137
return reader_func
122138
raise ValueError(f"Unsupported data format: {fmt}")
123139

140+
def _read_delta_with_credentials(self):
141+
"""Read Delta table with proper PyArrow filesystem for session tokens."""
142+
import pyarrow.fs as pafs
143+
144+
creds = self._creds_response
145+
reader_kwargs = self.reader_kwargs.copy()
146+
147+
# For AWS, create PyArrow S3FileSystem with session tokens
148+
if "aws_temp_credentials" in creds:
149+
if not self.region:
150+
raise ValueError(
151+
"The 'region' parameter is required for AWS S3 access. "
152+
"Please specify the AWS region (e.g., region='us-west-2')."
153+
)
154+
aws = creds["aws_temp_credentials"]
155+
filesystem = pafs.S3FileSystem(
156+
access_key=aws["access_key_id"],
157+
secret_key=aws["secret_access_key"],
158+
session_token=aws["session_token"],
159+
region=self.region,
160+
)
161+
reader_kwargs["filesystem"] = filesystem
162+
163+
# Call ray.data.read_delta with proper error handling
164+
try:
165+
return ray.data.read_delta(self._table_url, **reader_kwargs)
166+
except Exception as e:
167+
error_msg = str(e)
168+
if (
169+
"DeletionVectors" in error_msg
170+
or "Unsupported reader features" in error_msg
171+
):
172+
raise RuntimeError(
173+
f"Delta table uses Deletion Vectors, which requires deltalake>=0.10.0. "
174+
f"Error: {error_msg}\n"
175+
f"Solution: pip install --upgrade 'deltalake>=0.10.0'"
176+
) from e
177+
raise
178+
124179
def read(self):
125180
self._get_table_info()
126181
self._get_creds()
127182
self._set_env()
128183

129184
data_format = self._infer_data_format()
130-
reader = self._get_ray_reader(data_format)
131185

132186
if not ray.is_initialized():
133187
ray.init(runtime_env=self._runtime_env, **self.ray_init_kwargs)
134188

135-
url = self._table_url
136-
ds = reader(url, **self.reader_kwargs)
137-
return ds
189+
# Use special Delta reader for proper filesystem handling
190+
if data_format == "delta":
191+
return self._read_delta_with_credentials()
192+
193+
# Use standard reader for other formats
194+
reader = self._get_ray_reader(data_format)
195+
return reader(self._table_url, **self.reader_kwargs)

python/ray/data/_internal/datasource/unity_catalog_datasource.py

Lines changed: 0 additions & 237 deletions
This file was deleted.

0 commit comments

Comments
 (0)