Skip to content

Commit

Permalink
Upgrade to DataFusion 32
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Nov 17, 2023
1 parent fdd7c49 commit 09b2d26
Show file tree
Hide file tree
Showing 19 changed files with 1,337 additions and 1,103 deletions.
2,017 changes: 1,063 additions & 954 deletions Cargo.lock

Large diffs are not rendered by default.

36 changes: 18 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ object-store-s3 = ["object_store/aws"]
remote-tables = ["dep:datafusion-remote-tables"]

[dependencies]
arrow = "45.0.0"
arrow-buffer = "45.0.0"
arrow-csv = "45.0.0"
arrow = "47.0.0"
arrow-buffer = "47.0.0"
arrow-csv = "47.0.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "45.0.0"
arrow-schema = "45.0.0"
arrow-integration-test = "47.0.0"
arrow-schema = "47.0.0"
async-trait = "0.1.64"
base64 = "0.21.0"

Expand All @@ -46,25 +46,25 @@ clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.3"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-30-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-30-upgrade", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-32-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-32-upgrade", optional = true }

datafusion = "30.0.0"
datafusion-common = "30.0.0"
datafusion-expr = "30.0.0"
datafusion = "32.0.0"
datafusion-common = "32.0.0"
datafusion-expr = "32.0.0"

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "787c13a63efa9ada96d303c10c093424215aaa80", features = ["s3-native-tls", "datafusion-ext"] }
dynamodb_lock = { version = "0.4.3", default_features = false, features = ["native-tls"] }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "2b913b37e71ed96212dcec8c3fc8e865754ced82", features = ["s3-native-tls", "datafusion-ext"] }
dynamodb_lock = { version = "0.6.1", default_features = false, features = ["native-tls"] }

futures = "0.3"
hex = ">=0.4.0"
itertools = ">=0.10.0"
lazy_static = ">=1.4.0"
log = "0.4"
moka = { version = "0.11.0", default_features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.6.1"
object_store = "0.7"
parking_lot = "0.12.1"
percent-encoding = "2.2.0"
pretty_env_logger = "0.4"
Expand All @@ -81,7 +81,7 @@ rmpv = { version = "1.0.0", features = ["with-serde"] }
serde = "1.0.156"
serde_json = "1.0.93"
sha2 = ">=0.10.1"
sqlparser = "0.36.1"
sqlparser = "0.38"
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
Expand All @@ -90,11 +90,11 @@ tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signa
url = "2.2"
uuid = "1.2.1"
warp = "0.3.5"
wasi-common = "11.0.0"
wasi-common = "14.0.0"

# For WASM user-defined functions
wasmtime = "11.0.0"
wasmtime-wasi = "11.0.0"
wasmtime = "14.0.0"
wasmtime-wasi = "14.0.0"

[dev-dependencies]
assert_unordered = "0.3"
Expand All @@ -105,7 +105,7 @@ wiremock = "0.5"

[build-dependencies]
anyhow = "1.0.63" # for build.rs
prost-build = "0.11.6"
prost-build = "0.12.1"
vergen = "7"

[profile.release]
Expand Down
14 changes: 7 additions & 7 deletions datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = "45.0.0"
arrow-buffer = "45.0.0"
arrow-schema = "45.0.0"
arrow = "47.0.0"
arrow-buffer = "47.0.0"
arrow-schema = "47.0.0"
async-trait = "0.1.64"

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-30-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-32-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = "30.0.0"
datafusion-common = "30.0.0"
datafusion-expr = "30.0.0"
datafusion = "32.0.0"
datafusion-common = "32.0.0"
datafusion-expr = "32.0.0"
itertools = ">=0.10.0"
log = "0.4"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
Expand Down
16 changes: 11 additions & 5 deletions src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ impl DefaultCatalog {
// TODO: this means that any `information_schema.columns` query will serially load all
// delta tables present in the database. The real fix for this is to make DF use `TableSource`
// for the information schema, and then implement `TableSource` for `DeltaTable` in delta-rs.
let table_object_store = self.object_store.for_delta_table(table_uuid);
let table_log_store = self.object_store.get_log_store(table_uuid);

let table = DeltaTable::new(table_object_store, Default::default());
let table = DeltaTable::new(table_log_store, Default::default());
(Arc::from(table_name.to_string()), Arc::new(table) as _)
}

Expand All @@ -302,9 +302,15 @@ impl DefaultCatalog {
I: Iterator<Item = &'a AllDatabaseColumnsResult>,
{
let tables = collection_columns
.filter_map(|col| if let Some(table_name) = &col.table_name && let Some(table_uuid) = col.table_uuid {
Some(self.build_table(table_name, table_uuid))
} else { None })
.filter_map(|col| {
if let Some(table_name) = &col.table_name
&& let Some(table_uuid) = col.table_uuid
{
Some(self.build_table(table_name, table_uuid))
} else {
None
}
})
.collect::<HashMap<_, _>>();

(
Expand Down
4 changes: 2 additions & 2 deletions src/config/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub fn build_state_with_table_factories(
config: SessionConfig,
runtime: Arc<RuntimeEnv>,
) -> SessionState {
let mut state = SessionState::with_config_rt(config, runtime);
let mut state = SessionState::new_with_config_rt(config, runtime);

state
.table_factories_mut()
Expand Down Expand Up @@ -171,7 +171,7 @@ pub async fn build_context(

let runtime_env = RuntimeEnv::new(runtime_config)?;
let state = build_state_with_table_factories(session_config, Arc::new(runtime_env));
let context = SessionContext::with_state(state);
let context = SessionContext::new_with_state(state);

let object_store = build_object_store(&cfg.object_store);
let internal_object_store = Arc::new(InternalObjectStore::new(
Expand Down
14 changes: 6 additions & 8 deletions src/config/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,23 +135,23 @@ pub struct S3 {
impl S3 {
pub fn from_bucket_and_options(
bucket: String,
map: &HashMap<String, String>,
map: &mut HashMap<String, String>,
) -> Result<Self, ConfigError> {
Ok(S3 {
region: map.get("region").cloned(),
access_key_id: map
.get("access_key_id")
.remove("access_key_id")
.ok_or(ConfigError::Message(
"'access_key_id' not found in provided options".to_string(),
))?
.clone(),
secret_access_key: map
.get("secret_access_key")
.remove("secret_access_key")
.ok_or(ConfigError::Message(
"'secret_access_key' not found in provided options".to_string(),
))?
.clone(),
endpoint: map.get("endpoint").cloned(),
endpoint: map.remove("endpoint"),
bucket,
cache_properties: Some(ObjectCacheProperties::default()),
})
Expand All @@ -168,13 +168,11 @@ pub struct GCS {
impl GCS {
pub fn from_bucket_and_options(
bucket: String,
map: &HashMap<String, String>,
map: &mut HashMap<String, String>,
) -> Self {
GCS {
bucket,
google_application_credentials: map
.get("google_application_credentials")
.cloned(),
google_application_credentials: map.remove("google_application_credentials"),
cache_properties: Some(ObjectCacheProperties::default()),
}
}
Expand Down
Loading

0 comments on commit 09b2d26

Please sign in to comment.