Skip to content

Commit

Permalink
[BUG] Autodetect AWS region during deltalake scan (#3104)
Browse files Browse the repository at this point in the history
We do this already when we read from S3, but delta-rs does not, so their
metadata reads fail. This is especially an issue for unity catalog
tables, where the region is not specified anywhere.

Tested locally, it works but I'm unsure how I would test this in unit
tests since this is sort of unity+aws behavior specific

Fix for #2903
  • Loading branch information
kevinzwang authored Oct 23, 2024
1 parent 459ba82 commit 0727dc1
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 13 deletions.
19 changes: 19 additions & 0 deletions daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ScanTask,
StorageConfig,
)
from daft.io.aws_config import boto3_client_from_s3_config
from daft.io.object_store_options import io_config_to_storage_options
from daft.io.scan import PartitionField, ScanOperator
from daft.logical.schema import Schema
Expand All @@ -43,6 +44,24 @@ def __init__(
deltalake_sdk_io_config = storage_config.config.io_config
scheme = urlparse(table_uri).scheme
if scheme == "s3" or scheme == "s3a":
# Try to get region from boto3
if deltalake_sdk_io_config.s3.region_name is None:
from botocore.exceptions import BotoCoreError

try:
client = boto3_client_from_s3_config("s3", deltalake_sdk_io_config.s3)
response = client.get_bucket_location(Bucket=urlparse(table_uri).netloc)
except BotoCoreError as e:
logger.warning(
"Failed to get the S3 bucket region using existing storage config, will attempt to get it from the environment instead. Error from boto3: %s",
e,
)
else:
deltalake_sdk_io_config = deltalake_sdk_io_config.replace(
s3=deltalake_sdk_io_config.s3.replace(region_name=response["LocationConstraint"])
)

# Try to get config from the environment
if any([deltalake_sdk_io_config.s3.key_id is None, deltalake_sdk_io_config.s3.region_name is None]):
try:
s3_config_from_env = S3Config.from_env()
Expand Down
21 changes: 21 additions & 0 deletions daft/io/aws_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import TYPE_CHECKING

from daft.daft import S3Config

if TYPE_CHECKING:
import boto3


def boto3_client_from_s3_config(service: str, s3_config: S3Config) -> "boto3.client":
import boto3

return boto3.client(
service,
region_name=s3_config.region_name,
use_ssl=s3_config.use_ssl,
verify=s3_config.verify_ssl,
endpoint_url=s3_config.endpoint_url,
aws_access_key_id=s3_config.key_id,
aws_secret_access_key=s3_config.access_key,
aws_session_token=s3_config.session_token,
)
15 changes: 2 additions & 13 deletions daft/io/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Optional

from daft.daft import IOConfig
from daft.io.aws_config import boto3_client_from_s3_config


class DataCatalogType(Enum):
Expand Down Expand Up @@ -42,20 +43,8 @@ def table_uri(self, io_config: IOConfig) -> str:
"""
if self.catalog == DataCatalogType.GLUE:
# Use boto3 to get the table from AWS Glue Data Catalog.
import boto3
glue = boto3_client_from_s3_config("glue", io_config.s3)

s3_config = io_config.s3

glue = boto3.client(
"glue",
region_name=s3_config.region_name,
use_ssl=s3_config.use_ssl,
verify=s3_config.verify_ssl,
endpoint_url=s3_config.endpoint_url,
aws_access_key_id=s3_config.key_id,
aws_secret_access_key=s3_config.access_key,
aws_session_token=s3_config.session_token,
)
if self.catalog_id is not None:
# Allow cross account access, table.catalog_id should be the target account id
glue_table = glue.get_table(
Expand Down

0 comments on commit 0727dc1

Please sign in to comment.