Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(python): drop custom filesystem in write_deltalake #2137

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def write_deltalake(
*,
schema: Optional[Union[pa.Schema, DeltaSchema]] = ...,
partition_by: Optional[Union[List[str], str]] = ...,
filesystem: Optional[pa_fs.FileSystem] = None,
mode: Literal["error", "append", "overwrite", "ignore"] = ...,
file_options: Optional[ds.ParquetFileWriteOptions] = ...,
max_partitions: Optional[int] = ...,
Expand Down Expand Up @@ -147,7 +146,6 @@ def write_deltalake(
*,
schema: Optional[Union[pa.Schema, DeltaSchema]] = None,
partition_by: Optional[Union[List[str], str]] = None,
filesystem: Optional[pa_fs.FileSystem] = None,
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
file_options: Optional[ds.ParquetFileWriteOptions] = None,
max_partitions: Optional[int] = None,
Expand Down Expand Up @@ -185,9 +183,6 @@ def write_deltalake(
schema: Optional schema to write.
partition_by: List of columns to partition the table by. Only required
when creating a new table.
filesystem: Optional filesystem to pass to PyArrow. If not provided will
be inferred from uri. The file system has to be rooted in the table root.
Use the pyarrow.fs.SubTreeFileSystem, to adopt the root of pyarrow file systems.
mode: How to handle existing data. Default is to error if table already exists.
If 'append', will add new data.
If 'overwrite', will replace table with new data.
Expand Down Expand Up @@ -216,7 +211,7 @@ def write_deltalake(
description: User-provided description for this table.
configuration: A map containing configuration options for the metadata action.
overwrite_schema: If True, allows updating the schema of the table.
storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined.
storage_options: options passed to the native delta filesystem.
predicate: When using `Overwrite` mode, replace data that matches a predicate. Only used in rust engine.
partition_filters: the partition filters that will be used for partition overwrite. Only used in pyarrow engine.
large_dtypes: If True, the data schema is kept in large_dtypes, has no effect on pandas dataframe input.
Expand Down Expand Up @@ -295,11 +290,6 @@ def write_deltalake(

elif engine == "pyarrow":
# We need to write against the latest table version
if filesystem is not None:
raise NotImplementedError(
"Filesystem support is not yet implemented. #570"
)

filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))

if table: # already exists
Expand Down
13 changes: 0 additions & 13 deletions python/tests/test_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,6 @@ def test_benchmark_write(benchmark, sample_table, tmp_path, engine):
assert dt.to_pyarrow_table().sort_by("i") == sample_table


# TODO: support wrapping PyArrow filesystems
# @pytest.mark.benchmark(
# group="write"
# )
# def test_benchmark_write_pyarrow(benchmark, sample_table, tmp_path):
# fs = pa_fs.SubTreeFileSystem(str(tmp_path), pa_fs.LocalFileSystem())

# benchmark(write_deltalake, str(tmp_path), sample_table, mode="overwrite", filesystem=fs)

# dt = DeltaTable(str(tmp_path))
# assert dt.to_pyarrow_table(filesystem=fs).sort_by("i") == sample_table


@pytest.mark.benchmark(group="read")
def test_benchmark_read(benchmark, sample_table, tmp_path):
write_deltalake(str(tmp_path), sample_table)
Expand Down
14 changes: 0 additions & 14 deletions python/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,6 @@ def test_roundtrip_s3_direct(s3_localstack_creds, sample_data: pa.Table):
table = dt.to_pyarrow_table()
assert table == sample_data

# TODO: Refactor so DeltaTable can be instantiated with a storage backend
# Can provide S3Filesystem from pyarrow
# pa_s3fs = S3FileSystem(
# access_key=s3_localstack_creds["AWS_ACCESS_KEY_ID"],
# secret_key=s3_localstack_creds["AWS_SECRET_ACCESS_KEY"],
# endpoint_override=s3_localstack_creds["AWS_ENDPOINT_URL"],
# scheme="http",
# )

# write_deltalake(table_path, sample_data, filesystem=pa_s3fs, mode="overwrite")
# assert dt.version() == 2
# table = dt.to_pyarrow_table()
# assert table == sample_data


@pytest.mark.azure
@pytest.mark.integration
Expand Down
Loading