Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BSE-4358: Python S3 Table support #96

Merged
merged 14 commits into from
Jan 2, 2025
1 change: 1 addition & 0 deletions .github/workflows/_test_python_source.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ jobs:
role-to-assume: arn:aws:iam::427443013497:role/BodoEngineNightlyRole
role-session-name: BodoEnginePrCiSession
role-skip-session-tagging: true
role-duration-seconds: 10800

# Run Tests
- name: Run Tests
Expand Down
3 changes: 2 additions & 1 deletion bodo/io/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,12 @@ def format_iceberg_conn(conn_str: str) -> str:
"iceberg+abfs",
"iceberg+abfss",
"iceberg+rest",
"iceberg+arn",
):
raise BodoError(
"'con' must start with one of the following: 'iceberg://', 'iceberg+file://', "
"'iceberg+s3://', 'iceberg+thrift://', 'iceberg+http://', 'iceberg+https://', 'iceberg+glue', 'iceberg+snowflake://', "
"'iceberg+abfs://', 'iceberg+abfss://', 'iceberg+rest://'"
"'iceberg+abfs://', 'iceberg+abfss://', 'iceberg+rest://', 'iceberg+arn'"
)

# Remove Iceberg Prefix when using Internally
Expand Down
2 changes: 1 addition & 1 deletion bodo/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ def minio_server():
# For compatibility with older MinIO versions.
os.environ["MINIO_ACCESS_KEY"] = access_key
os.environ["MINIO_SECRET_KEY"] = secret_key
os.environ["AWS_S3_ENDPOINT"] = f"http://{address}/"

args = [
"minio",
Expand Down Expand Up @@ -313,6 +312,7 @@ def minio_server_with_s3_envs(minio_server: tuple[str, str, str]):
"AWS_ACCESS_KEY_ID": minio_server[0],
"AWS_SECRET_ACCESS_KEY": minio_server[1],
"AWS_SESSION_TOKEN": None,
"AWS_S3_ENDPOINT": f"http://{minio_server[2]}/",
}
):
yield minio_server
Expand Down
132 changes: 132 additions & 0 deletions bodo/tests/test_s3_tables_iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import random
import string
from io import StringIO

import boto3
import pandas as pd
import pytest

import bodo
from bodo.tests.user_logging_utils import (
check_logger_msg,
create_string_io_logger,
set_logging_stream,
)
from bodo.tests.utils import (
_get_dist_arg,
check_func,
run_rank0,
temp_env_override,
)

pytest_mark = pytest.mark.iceberg

bucket_arn = "arn:aws:s3tables:us-east-2:427443013497:bucket/unittest-bucket"


@temp_env_override({"AWS_REGION": "us-east-2"})
def test_basic_read(memory_leak_check):
"""
Test reading a complete Iceberg table S3 Tables
"""

def impl(table_name, conn, db_schema):
return pd.read_sql_table(table_name, conn, db_schema)

py_out = pd.DataFrame(
{
"A": ["ally", "bob", "cassie", "david", None],
"B": [10.5, -124.0, 11.11, 456.2, -8e2],
"C": [True, None, False, None, None],
}
)

conn = "iceberg+" + bucket_arn
check_func(
impl,
("bodo_iceberg_read_test", conn, "read_namespace"),
py_output=py_out,
sort_output=True,
reset_index=True,
)


@temp_env_override({"AWS_REGION": "us-east-2"})
def test_read_implicit_pruning(memory_leak_check):
"""
Test reading an Iceberg table from S3 Tables with Bodo
compiler column pruning
"""

def impl(table_name, conn, db_schema):
df = pd.read_sql_table(table_name, conn, db_schema)
df["B"] = df["B"].abs()
return df[["B", "A"]]

py_out = pd.DataFrame(
{
"B": [10.5, 124.0, 11.11, 456.2, 8e2],
"A": ["ally", "bob", "cassie", "david", None],
}
)

conn = "iceberg+" + bucket_arn
stream = StringIO()
logger = create_string_io_logger(stream)
with set_logging_stream(logger, 1):
check_func(
impl,
("bodo_iceberg_read_test", conn, "read_namespace"),
py_output=py_out,
sort_output=True,
reset_index=True,
)
check_logger_msg(stream, "Columns loaded ['A', 'B']")


@temp_env_override({"AWS_REGION": "us-east-2"})
@temp_env_override({"AWS_DEFAULT_REGION": "us-east-2"})
def test_basic_write(memory_leak_check):
"""
Test writing a complete Iceberg table to S3 Tables
"""

@bodo.jit(distributed=["df"])
def write(df, table_name, conn, db_schema):
df.to_sql(table_name, conn, db_schema)

def read(table_name, conn, db_schema):
return pd.read_sql_table(table_name, conn, db_schema)

df = pd.DataFrame(
{
"A": ["ally", "bob", "cassie", "david", None] * 5,
"B": [10.5, -124.0, 11.11, 456.2, -8e2] * 5,
"C": [True, None, False, None, None] * 5,
}
)
conn = "iceberg+" + bucket_arn
table_name = f"bodo_iceberg_write_test_{''.join(random.choices(string.ascii_lowercase , k=4))}"

try:
write(_get_dist_arg(df), table_name, conn, "write_namespace")

check_func(
read,
(table_name, conn, "write_namespace"),
py_output=df,
sort_output=True,
reset_index=True,
)
finally:

def cleanup():
client = boto3.client("s3tables")
client.delete_table(
name=table_name,
namespace="write_namespace",
tableBucketARN=bucket_arn,
)
client.close()

run_rank0(cleanup)
1 change: 1 addition & 0 deletions docs/docs/iceberg/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ These are the Iceberg catalogs supported in Bodo Python and SQL:
| Tabular's RESTCatalog | Yes | Yes, via the TabularCatalog | Only tested on S3 |
| GlueCatalog | Yes | Yes, via TablePath | |
| HiveCatalog | Yes | Yes, via TablePath | |
| S3 Tables | Yes | No | |


## Limitations and Considerations
Expand Down
5 changes: 5 additions & 0 deletions docs/docs/iceberg/read_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ The following catalogs are supported:
- Parameter `token` or `credential` is required for authentication and should be retrieved from the REST catalog provider.
- E.g. `iceberg+rest` or `iceberg+rest://<rest-uri>?warehouse=<warehouse>&token=<token>`

- S3 Tables
- Connection string must be of the form `iceberg+arn:aws:s3tables:<region>:<account_number>:bucket/<bucket>`
- `params` is unused
- E.g. `iceberg+arn:aws:s3tables:us-west-2:123456789012:bucket/mybucket`

#### Pandas APIs {#iceberg-pandas}

Example code for reading:
Expand Down
10 changes: 8 additions & 2 deletions iceberg/bodo_iceberg_connector/catalog_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

from bodo_iceberg_connector.errors import IcebergError, IcebergWarning

CatalogType = Literal["hadoop", "hive", "nessie", "glue", "snowflake", "rest"]
CatalogType = Literal[
"hadoop", "hive", "nessie", "glue", "snowflake", "rest", "s3tables"
]


def _get_first(elems: dict[str, list[str]], param: str) -> str | None:
Expand Down Expand Up @@ -64,6 +66,8 @@ def parse_conn_str(
catalog_type = "snowflake"
elif parsed_conn.scheme == "rest":
catalog_type = "rest"
elif parsed_conn.scheme == "arn" and "aws:s3tables" in parsed_conn.path:
catalog_type = "s3tables"

else:
types = ", ".join(
Expand All @@ -75,6 +79,7 @@ def parse_conn_str(
"glue",
"snowflake",
"rest",
"s3tables",
]
)
raise IcebergError(
Expand All @@ -90,10 +95,11 @@ def parse_conn_str(
"glue",
"snowflake",
"rest",
"s3tables",
]

# Get Warehouse Location
if catalog_type != "snowflake" and warehouse is None:
if catalog_type not in ("snowflake", "s3tables") and warehouse is None:
warnings.warn(
"It is recommended that the `warehouse` property is included in the connection string for this type of catalog. Bodo can automatically infer what kind of FileIO to use from the warehouse location. It is also highly recommended to include with Glue and Nessie catalogs.",
IcebergWarning,
Expand Down
14 changes: 13 additions & 1 deletion iceberg/bodo_iceberg_connector/iceberg-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<iceberg.version>1.5.2</iceberg.version>
<hadoop.version>3.3.3</hadoop.version>
<aws.old.version>1.12.382</aws.old.version>
<aws.version>2.19.13</aws.version>
<aws.version>2.29.26</aws.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -227,6 +227,18 @@
<version>${aws.version}</version>
</dependency>

<!-- S3Tables Dependencies -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3tables</artifactId>
<version>${aws.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.s3tables</groupId>
<artifactId>s3-tables-catalog-for-iceberg</artifactId>
<version>0.1.3</version>
</dependency>

<!-- Logging Dependencies (to avoid duplicate versions) -->
<!-- SLF4J Complains when different versions are registered -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,18 @@ public static Triple<Configuration, Map<String, String>, URIBuilder> prepareInpu
public static Catalog create(String connStr, String catalogType, String coreSitePath)
throws URISyntaxException {
// Create Catalog
final Catalog catalog;

// S3Tables doesn't use a URI
if (connStr.startsWith("arn:aws:s3tables") && catalogType.equals("s3tables")) {
catalog = S3TablesBuilder.create(connStr);
return CachingCatalog.wrap(catalog);
}

var out = prepareInput(connStr, catalogType, coreSitePath);
Configuration conf = out.getFirst();
Map<String, String> params = out.getSecond();
URIBuilder uriBuilder = out.getThird();
final Catalog catalog;

switch (catalogType.toLowerCase()) {
case "nessie":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.bodo.iceberg.catalog;

import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import software.amazon.s3tables.iceberg.S3TablesCatalog;

public class S3TablesBuilder {
public static Catalog create(String connStr) {
S3TablesCatalog catalog = new S3TablesCatalog();
catalog.initialize("S3Tables_catalog", Map.of("warehouse", connStr));
return catalog;
}
}
10 changes: 2 additions & 8 deletions iceberg/bodo_iceberg_connector/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
from py4j.protocol import Py4JError

from bodo_iceberg_connector.catalog_conn import (
gen_table_loc,
normalize_data_loc,
parse_conn_str,
)
from bodo_iceberg_connector.errors import IcebergError, IcebergJavaError
from bodo_iceberg_connector.errors import IcebergJavaError
from bodo_iceberg_connector.py4j_support import (
get_catalog,
launch_jvm,
Expand Down Expand Up @@ -113,12 +112,7 @@ def get_iceberg_info(conn_str: str, schema: str, table: str, error: bool = True)
iceberg_schema = None
partition_spec = []
sort_order = []

if warehouse is None:
raise IcebergError(
"`warehouse` parameter required in connection string"
)
table_loc = gen_table_loc(catalog_type, warehouse, schema, table) # type: ignore
table_loc = ""
Comment on lines -116 to +115
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this anymore as AFAIK since we now create a transaction to get the write location

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not used for read occasionally? Can we test with the E2E tests, cause its different catalogs that usually exhibit this behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think so but will check locally since the e2e tests are broken right now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I tried to get the e2e tests running locally and couldn't even on main. Hadoop did pass which is the only one we don't have unittest coverage for so I think we're good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok


else:
schema_id: int | None = java_table_info.getSchemaID()
Expand Down
Loading
Loading