-
Notifications
You must be signed in to change notification settings - Fork 415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: pass storage options down when getting delta table #893
Conversation
python/deltalake/writer.py
Outdated
storage_options = (table._storage_options or {}).update( | ||
storage_options or {} | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@roeap Does this seems like an acceptable simplification? This would mean the storage options provided will override any ones that were originally specified in the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should work fine. Right now we have an additional issue though, when is comes to storage options. Not entirely sure, but in some cases environment variables set may take precedence over configuration ... I remember fixing this in some places, but fore S§ I am not 100% sure..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't environment variable overriding code configuration expected behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure. My personal expectation would be that my explicit config takes precedence over the environment. Looking at e.g. multiple targets it seems to be easier to manage. IIRC the python file systems behave like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I currently get:
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Generic { store: "S3", source: CreateMultipartRequest { source: Error { retries: 0, message: "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided
With these changes.
If you have configured the AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
environment variables, they have higher precedence than the values you've stored in your credentials file. In my case however they are the same, so should not matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
python/deltalake/writer.py
Outdated
storage_options = (table._storage_options or {}).update( | ||
storage_options or {} | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should work fine. Right now we have an additional issue though, when is comes to storage options. Not entirely sure, but in some cases environment variables set may take precedence over configuration ... I remember fixing this in some places, but fore S§ I am not 100% sure..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some insights from me, if valuable at all.
@@ -153,7 +153,7 @@ def write_deltalake( | |||
else: | |||
# Non-existant local paths are only accepted as fully-qualified URIs | |||
table_uri = "file://" + str(Path(table_or_uri).absolute()) | |||
table = try_get_deltatable(table_or_uri) | |||
table = try_get_deltatable(table_or_uri, storage_options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this myself by also adding:
def try_get_deltatable(table_uri: str, storage_options: Dict[str, str], version: Optional[int] = None) -> Optional[DeltaTable]:
try:
return DeltaTable(table_uri=table_uri, storage_options=storage_options, version=version)
except PyDeltaTableError as err:
# TODO: There has got to be a better way...
if "Not a Delta table" in str(err):
return None
elif "cannot find" in str(err):
return None
elif "No such file or directory" in str(err):
return None
else:
raise
This will return None
when the table does not exist, instead of failing for creds, but potentially also taking the version into account later...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will return None when the table does not exist, instead of failing for creds, but potentially also taking the version into account later...
I'm not sure what you are saying here. It's supposed to return None when the table does not exist. Are you saying that if it just fails to authenticate to the Object store it might also return None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, to be clear.
I'm saying the logic seem correct and yes it should return None in this case.
python/deltalake/writer.py
Outdated
else: | ||
filesystem = pa_fs.PyFileSystem( | ||
DeltaStorageHandler(table_uri, storage_options) | ||
storage_options = (table._storage_options or {}).update( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same as I tried:
if filesystem is None:
if table is not None:
storage_options = (table._storage_options or {}).update(
storage_options or {}
)
filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))
python/deltalake/writer.py
Outdated
storage_options = (table._storage_options or {}).update( | ||
storage_options or {} | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I currently get:
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Generic { store: "S3", source: CreateMultipartRequest { source: Error { retries: 0, message: "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<Error><Code>SignatureDoesNotMatch</Code><Message>The request signature we calculated does not match the signature you provided
With these changes.
If you have configured the AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
environment variables, they have higher precedence than the values you've stored in your credentials file. In my case however they are the same, so should not matter.
b4ba322
to
0d5cb2f
Compare
@@ -29,3 +32,75 @@ def test_read_simple_table_from_remote(s3_localstack): | |||
table_path = "s3://deltars/simple" | |||
dt = DeltaTable(table_path) | |||
assert dt.to_pyarrow_table().equals(pa.table({"id": [5, 7, 9]})) | |||
|
|||
|
|||
@pytest.mark.s3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wjones127 is this you adding an integration test for S3? Based on our other conversations, since it seems like the unit-test were just testing on your local filesystem writing new delta tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
if filesystem is not None: | ||
raise NotImplementedError("Filesystem support is not yet implemented. #570") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a note of what we have to do here: #570 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wjones127 👍
It's great to include integration tests; we will avoid regressions in our future changes.
LGTM, it's OK to include this PR in the v0.6.3
version of the Python binding!
Does this now work based on previous issues? Is it released in the latest version? |
Description
We forgot to pass down storage options, so storage options were being ignored if you were passing a URI and not a DeltaTable.
This also adds a new S3 option:
AWS_S3_ALLOW_UNSAFE_RENAME
which allows using the default S3 backend without support for concurrent writers. This is an opt-in option and there is a helpful error message to provide guidance to users.To test, I implemented integration tests that run against localstack.
Related Issue(s)
Documentation