Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write and delete operations to ObjectStore #2246

Closed
wants to merge 14 commits into from
51 changes: 50 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ mod roundtrip_tests {
self,
object_store::{
local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader,
ObjectStore,
ObjectStore, ObjectWriter,
},
SizedFile,
},
Expand Down Expand Up @@ -1016,6 +1016,55 @@ mod roundtrip_tests {
"this is only a test object store".to_string(),
))
}

fn file_writer(
&self,
_path: &str,
) -> datafusion_data_access::Result<Arc<dyn ObjectWriter>> {
unimplemented!();
}

async fn create_dir(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was curious what this corresponded to given that object stores don't have a concept of a directory. It would appear that at least in the case of the S3 implementation, it creates empty objects as pretend directories - https://github.com/apache/arrow/blob/master/cpp/src/arrow/filesystem/s3fs.cc

I'm not sure how I feel about this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I didn't know that was the behavior in C++ Arrow. I started a sibling PR to implement these methods for S3 in datafusion-contrib/datafusion-objectstore-s3#54, so I might prototype some more there and see if that's necessary in any way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like fsspec implementation doesn't create empty objects as directories, so I think it is sensible to do the same.

It's not totally a no-op though for cases like S3. You might want to have create_dir create a bucket if it doesn't exist (though I'd like for there to be an option to turn off bucket creation). And it should probably check that there isn't an existing file at the path you are trying to create a directory at.

I think what I will do is document the expected behavior and create a generic test that can be run on an arbitrary filesystem to help implementation make sure they follow expected behavior.

&self,
_path: &str,
_recursive: bool,
) -> datafusion_data_access::Result<()> {
unimplemented!();
}

async fn remove_dir_all(
&self,
_path: &str,
) -> datafusion_data_access::Result<()> {
unimplemented!();
}

async fn remove_dir_contents(
&self,
_path: &str,
) -> datafusion_data_access::Result<()> {
unimplemented!();
}

async fn remove_file(&self, _path: &str) -> datafusion_data_access::Result<()> {
unimplemented!();
}

async fn rename(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of any object stores that support native rename, I think this will need to be implemented as a copy and remove. Perhaps we could provide this as the default implementation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, in Rust you can't override implementations from a trait, so I'm reluctant to provide a default implementation. But agreed that as far as I can see that does seem to be the pattern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&self,
_source: &str,
_dest: &str,
) -> datafusion_data_access::Result<()> {
unimplemented!();
}

async fn copy(
&self,
_source: &str,
_dest: &str,
) -> datafusion_data_access::Result<()> {
unimplemented!();
}
}

// Given a identity of a LogicalPlan converts it to protobuf and back, using debug formatting to test equality.
Expand Down
2 changes: 1 addition & 1 deletion data-access/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "src/lib.rs"

[dependencies]
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
chrono = { version = "0.4", features = ["std"] }
futures = "0.3"
parking_lot = "0.12"
tempfile = "3"
Expand Down
Loading