-
Notifications
You must be signed in to change notification settings - Fork 452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add TableProviderFactory and test for SQL to register tables dynamically at runtime #892
Conversation
rust/tests/datafusion_test.rs
Outdated
|
||
let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); | ||
d.push("tests/data/delta-0.8.0-partitioned"); | ||
let sql = format!("CREATE EXTERNAL TABLE demo STORED AS DELTATABLE LOCATION '{}'", d.to_str().unwrap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the goal of the PR
rust/Cargo.toml
Outdated
@@ -9,7 +9,7 @@ description = "Native Delta Lake implementation in Rust" | |||
edition = "2021" | |||
|
|||
[dependencies] | |||
arrow = { version = "22", optional = true } | |||
arrow = { version = "24", optional = true } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to upgrade arrow to match Datafusion master
rust/Cargo.toml
Outdated
datafusion = { version = "12", optional = true } | ||
datafusion-expr = { version = "12", optional = true } | ||
datafusion-common = { version = "12", optional = true } | ||
datafusion = { git = "https://github.com/spaceandtimelabs/arrow-datafusion.git", rev = "cb9241982556745939175b0597194ad819bab613", optional = true } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will point back at apache/arrow-datafusion
once that PR (#3867) is merged. Then probably v14.0.0
once it is released.
This is working in Ballista from DataGrip through FlightSQL! 🥳 @alamb that's the end goal for the related datafusion PR |
d96c625
to
3795587
Compare
@houqp I was finally able to get Ballista working running deltalake queries. I'd appreciate a review when you get the chance (and a CI kick would be appreciated as well). |
I fixed the CI checks I could, and fixed datafusion to not require |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very cool!
I have an option suggestion and a question on how we are handling storage options.
env.register_object_store( | ||
url.scheme(), | ||
url.host_str().unwrap_or_default(), | ||
table.object_store(), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, any URI passed with the same scheme and bucket name will match to this object store? And the object stores options will be preserved? I'm worried about the cases where storage options were directly passed into the DeltaTable
instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the object store url we get from table.storage.object_store_url
is just for internal purposes and should not collide with common urls. It uses delta-rs://
as a scheme and does a little bit of pruning required for widows paths.
let provider = open_table(url).await.unwrap(); | ||
Ok(Arc::new(provider)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is perhaps a place where storage_options
will get lost, if we care about that. If we only want to support passing credentials via env variables for systems that use this (just distributed, right?), that might be sensible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only want to support passing credentials via env variables for systems that use this (just distributed, right?), that might be sensible.
For now, that is all I am attempting to do with this PR. I can possibly see someone wanting to file another PR to extend it in the future, but if they are passing storage_options
, I think that means they are registering tables in Rust (not SQL), and therefor already are writing a fork of Ballista, in which case they can write their own serde?
It's a worthwhile thought experiment, but I wouldn't recommend coding for it quite yet...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fair. I just wanted to make sure you were aware of that limitation.
I just marked #852 as ready for review. That should fix the python builds which are breaking due to the arrow version bump as well as handle the parquet2 build that needed some changes as parquet-format moved into the parquet crate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great work! really excited to see this land.
env.register_object_store( | ||
url.scheme(), | ||
url.host_str().unwrap_or_default(), | ||
table.object_store(), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the object store url we get from table.storage.object_store_url
is just for internal purposes and should not collide with common urls. It uses delta-rs://
as a scheme and does a little bit of pruning required for widows paths.
fn with_new_children( | ||
self: Arc<Self>, | ||
children: Vec<Arc<dyn ExecutionPlan>>, | ||
) -> DataFusionResult<Arc<dyn ExecutionPlan>> { | ||
ExecutionPlan::with_new_children(self.parquet_scan.clone(), children) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be completely off, but can a scan operation even have children? ParquetScan
itself just returns Ok(self)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can a scan operation even have children?
I would think not, but the goal of this implementation was merely to proxy every call to the wrapped ParquetScan
with the one additional behavior of registering the ObjectStore
prior to proxying the execute()
request... so I just followed that pattern here. I think it's probably best to leave it this way just in case there is some future change where a ParquetScan
might have children for some reason - possibly because someone wants to register their own proxy in the same way I just did :)
I did some fixes, so there's a reasonable chance CI should pass now. |
It looks like the remaining errors are in src/python, so I assume they will be resolved by #852 . |
c5c07b4
to
4992277
Compare
cb553ea
to
92f1960
Compare
92f1960
to
6a91f89
Compare
I see #852 was merged. I think we can merge this now. If someone wouldn't mind kicking off CI, I'd appreciate it! |
I don't understand the python test failure... it looks like a timeout, is it an intermittent test? Or should I try to dig into it more? |
Yeah I think that is unrelated. I'll need to look into that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Thanks!
Thanks everyone! |
Thank you! |
Description
Use (hopefully soon-to-be-merged) Datafusion functionality to allow dynamic (i.e. SQL-based) registration of deltalake tables at runtime.
Related Issue(s)
CREATE EXTERNAL TABLE dt STORED AS DELTATABLE
syntax with datafusion #891Documentation