Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TableProviderFactory and test for SQL to register tables dynamically at runtime #892

Merged
merged 5 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 115 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glibc_version/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod imp {
// glibc version is taken from std/sys/unix/os.rs
pub fn get_version() -> Result<Version, String> {
let output = Command::new("ldd")
.args(&["--version"])
.args(["--version"])
.output()
.expect("failed to execute ldd");
let output_str = std::str::from_utf8(&output.stdout).unwrap();
Expand Down
4 changes: 3 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ rusoto_glue = { version = "0.48", default-features = false, optional = true }
datafusion = { version = "14", optional = true }
datafusion-expr = { version = "14", optional = true }
datafusion-common = { version = "14", optional = true }
datafusion-proto = { version = "14", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
Expand All @@ -72,14 +73,15 @@ tempfile = "3"
utime = "0.3"

[build-dependencies]
glibc_version = "0"
glibc_version = { path = "../glibc_version" }

[features]
default = ["arrow", "parquet"]
datafusion-ext = [
"datafusion",
"datafusion-expr",
"datafusion-common",
"datafusion-proto",
"arrow",
"parquet",
]
Expand Down
66 changes: 62 additions & 4 deletions rust/src/builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Create or load DeltaTables

use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;

use crate::delta::{DeltaResult, DeltaTable, DeltaTableError};
Expand All @@ -18,6 +19,9 @@ use url::Url;
use crate::storage::s3::{S3StorageBackend, S3StorageOptions};
#[cfg(any(feature = "s3", feature = "s3-rustls"))]
use object_store::aws::AmazonS3Builder;
use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

#[cfg(feature = "azure")]
mod azure;
Expand Down Expand Up @@ -59,7 +63,8 @@ impl Default for DeltaVersion {
}

/// Configuration options for delta table
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DeltaTableConfig {
/// Indicates whether our use case requires tracking tombstones.
/// This defaults to `true`
Expand Down Expand Up @@ -227,13 +232,21 @@ impl DeltaTableBuilder {
/// table use the `load` function
pub fn build(self) -> Result<DeltaTable, DeltaTableError> {
let (storage, storage_url) = match self.options.storage_backend {
// Some(storage) => storage,
Some((store, path)) => {
let mut uri = self.options.table_uri + path.as_ref();
if !uri.contains(':') {
uri = format!("file://{}", uri);
}
let url = Url::parse(uri.as_str())
.map_err(|_| DeltaTableError::Generic(format!("Can't parse uri: {}", uri)))?;
let url = StorageUrl::new(url);
(store, url)
}
None => get_storage_backend(
&self.options.table_uri,
self.storage_options,
self.allow_http,
)?,
_ => todo!(),
};
let config = DeltaTableConfig {
require_tombstones: self.options.require_tombstones,
Expand Down Expand Up @@ -282,6 +295,51 @@ pub struct StorageUrl {
pub(crate) prefix: Path,
}

impl Serialize for StorageUrl {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(None)?;
seq.serialize_element(self.url.as_str())?;
seq.serialize_element(&self.prefix.to_string())?;
seq.end()
}
}

impl<'de> Deserialize<'de> for StorageUrl {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct StorageUrlVisitor {}
impl<'de> Visitor<'de> for StorageUrlVisitor {
type Value = StorageUrl;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("struct StorageUrl")
}

fn visit_seq<V>(self, mut seq: V) -> Result<StorageUrl, V::Error>
where
V: SeqAccess<'de>,
{
let url = seq
.next_element()?
.ok_or_else(|| V::Error::invalid_length(0, &self))?;
let prefix: &str = seq
.next_element()?
.ok_or_else(|| V::Error::invalid_length(1, &self))?;
let url = Url::parse(url).map_err(|_| V::Error::missing_field("url"))?;
let prefix = Path::parse(prefix).map_err(|_| V::Error::missing_field("prefix"))?;
let url = StorageUrl { url, prefix };
Ok(url)
}
}
deserializer.deserialize_seq(StorageUrlVisitor {})
}
}

impl StorageUrl {
/// Parse a provided string as a `StorageUrl`
///
Expand Down Expand Up @@ -422,7 +480,7 @@ impl std::fmt::Display for StorageUrl {
}

/// Create a new storage backend used in Delta table
fn get_storage_backend(
pub(crate) fn get_storage_backend(
table_uri: impl AsRef<str>,
// annotation needed for some feature builds
#[allow(unused_variables)] options: Option<HashMap<String, String>>,
Expand Down
Loading