Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: table options #2767

Merged
merged 47 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
1cd4172
refactor: move parser into own crate
universalmind303 Mar 6, 2024
e154cbf
refactor: move parser into own crate
universalmind303 Mar 6, 2024
b80aff8
Merge branch 'main' into universalmind303/extract-parser
universalmind303 Mar 6, 2024
1a2a931
remove main
universalmind303 Mar 6, 2024
fec18cc
Merge branch 'universalmind303/extract-parser' of github.com:GlareDB/…
universalmind303 Mar 6, 2024
e9ecd2a
cleanup
universalmind303 Mar 6, 2024
2849edc
pr feedback
universalmind303 Mar 6, 2024
04c88c8
refactor: remove proptests (#2748)
universalmind303 Mar 6, 2024
932884e
wip: table options refactor
universalmind303 Mar 7, 2024
2652d61
wip
universalmind303 Mar 7, 2024
bb0ce70
wip
universalmind303 Mar 8, 2024
d3f03c4
wip
universalmind303 Mar 8, 2024
4335900
i think first pass is done
universalmind303 Mar 8, 2024
ace5fc1
wip
universalmind303 Mar 8, 2024
0f19436
get all tests passing, TODO: external schema
universalmind303 Mar 8, 2024
2502a61
add some docstrings
universalmind303 Mar 11, 2024
e85f623
remove validation from datasource
universalmind303 Mar 11, 2024
305a74f
fmt
universalmind303 Mar 11, 2024
d3a4c7c
fix the "columns" stuff
universalmind303 Mar 11, 2024
7ca5256
refactor(table options): serde (#2770)
universalmind303 Mar 12, 2024
5052e92
Merge branch 'universalmind303/table-options-refactor' of github.com:…
universalmind303 Mar 12, 2024
6e8d7c0
Merge branch 'main' of github.com:GlareDB/glaredb into universalmind3…
universalmind303 Mar 12, 2024
c46933d
pr feedback
universalmind303 Mar 12, 2024
c056a0e
get schema stuff working
universalmind303 Mar 12, 2024
4935cc3
refactor(table options): backwards compat for catalog (#2775)
universalmind303 Mar 12, 2024
bc17867
fmt
universalmind303 Mar 12, 2024
d9baec8
Merge branch 'universalmind303/table-options-refactor' of github.com:…
universalmind303 Mar 12, 2024
ae2c21f
wip
universalmind303 Mar 14, 2024
d3b03be
remove "datasource" stuff
universalmind303 Mar 14, 2024
691980e
revert more code
universalmind303 Mar 14, 2024
0e42b84
revert more code
universalmind303 Mar 14, 2024
8a62643
revert more code
universalmind303 Mar 14, 2024
2c178e0
clippy
universalmind303 Mar 14, 2024
ddf88ce
cleanup
universalmind303 Mar 14, 2024
918b94e
Merge branch 'main' into universalmind303/table-options-refactor
universalmind303 Mar 14, 2024
49494ca
cleanup
universalmind303 Mar 14, 2024
1fb7a26
revert more code
universalmind303 Mar 14, 2024
b243e41
Merge branch 'main' into universalmind303/table-options-refactor
universalmind303 Mar 14, 2024
5604c58
revert more code
universalmind303 Mar 14, 2024
93ea929
Merge branch 'universalmind303/table-options-refactor' of github.com:…
universalmind303 Mar 14, 2024
b7b6a58
tbl opts
universalmind303 Mar 14, 2024
45cf0dc
fixes
universalmind303 Mar 14, 2024
eefa93e
Merge branch 'main' of github.com:GlareDB/glaredb into universalmind3…
universalmind303 Mar 15, 2024
0513e84
pr feedback
universalmind303 Mar 15, 2024
9890312
pr feedback
universalmind303 Mar 15, 2024
c72af53
clippy
universalmind303 Mar 15, 2024
3bf7d5f
merge from main
universalmind303 Mar 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 10 additions & 12 deletions crates/catalog/src/session_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ use protogen::metastore::types::catalog::{
TableEntry,
TunnelEntry,
};
use protogen::metastore::types::options::{
InternalColumnDefinition,
TableOptions,
TableOptionsInternal,
};
use protogen::metastore::types::options::{InternalColumnDefinition, TableOptionsInternal};
use tracing::debug;

use super::client::MetastoreClientHandle;
Expand Down Expand Up @@ -461,12 +457,13 @@ impl TempCatalog {
external: false,
is_temp: true,
},
options: TableOptions::Internal(TableOptionsInternal {
columns: columns.to_owned(),
}),
options: TableOptionsInternal {
columns: columns.clone(),
}
.into(),
tunnel_id: None,
access_mode: SourceAccessMode::ReadWrite,
columns: Some(columns.to_owned()),
columns: Some(columns),
}
})
}
Expand Down Expand Up @@ -505,12 +502,13 @@ impl TempCatalog {
external: false,
is_temp: true,
},
options: TableOptions::Internal(TableOptionsInternal {
options: TableOptionsInternal {
columns: Vec::new(),
}),
}
.into(),
tunnel_id: None,
access_mode: SourceAccessMode::ReadWrite,
columns: Some(Vec::new()),
columns: None,
});
}

Expand Down
10 changes: 5 additions & 5 deletions crates/datasources/src/bson/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use bson::RawDocumentBuf;
use bytes::BytesMut;
use datafusion::arrow::datatypes::{FieldRef, Schema};
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::TableProvider;
use datafusion::parquet::data_type::AsBytes;
Expand All @@ -20,12 +20,12 @@ use crate::object_store::{ObjStoreAccess, ObjStoreAccessor};
pub async fn bson_streaming_table(
store_access: Arc<dyn ObjStoreAccess>,
source_url: DatasourceUrl,
fields: Option<Vec<FieldRef>>,
schema: Option<Schema>,
schema_inference_sample_size: Option<i64>,
) -> Result<Arc<dyn TableProvider>, BsonError> {
// TODO: set a maximum (1024?) or have an adaptive mode
// (at least n but stop after n the same) or skip documents
let sample_size = if fields.is_some() {
let sample_size = if schema.is_some() {
0
} else {
schema_inference_sample_size.unwrap_or(100)
Expand Down Expand Up @@ -93,8 +93,8 @@ pub async fn bson_streaming_table(
let mut streams = Vec::<Arc<(dyn PartitionStream + 'static)>>::with_capacity(readers.len() + 1);

// get the schema; if provided as an argument, just use that, otherwise, sample.
let schema = if let Some(fields) = fields {
Arc::new(Schema::new(fields))
let schema = if let Some(schema) = schema {
Arc::new(schema)
} else {
// iterate through the readers and build up a sample of the first <n>
// documents to be used to infer the schema.
Expand Down
1 change: 1 addition & 0 deletions crates/datasources/src/debug/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ pub enum DebugError {
InvalidTunnel(String),
}


pub type Result<T, E = DebugError> = std::result::Result<T, E>;
1 change: 1 addition & 0 deletions crates/datasources/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Data source implementations.

pub mod bigquery;
pub mod bson;
pub mod cassandra;
Expand Down
24 changes: 10 additions & 14 deletions crates/datasources/src/native/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use object_store::prefix::PrefixStore;
use object_store::ObjectStore;
use object_store_util::shared::SharedObjectStore;
use protogen::metastore::types::catalog::TableEntry;
use protogen::metastore::types::options::{TableOptions, TableOptionsInternal};
use protogen::metastore::types::options::{TableOptionsInternal, TableOptionsV0};
use serde_json::{json, Value};
use url::Url;
use uuid::Uuid;
Expand Down Expand Up @@ -262,12 +262,11 @@ impl NativeTableStorage {
Ok(x.next().await.is_some())
}

fn opts_from_ent(table: &TableEntry) -> Result<&TableOptionsInternal> {
let opts = match &table.options {
TableOptions::Internal(opts) => opts,
_ => return Err(NativeError::NotNative(table.clone())),
};
Ok(opts)
fn opts_from_ent(table: &TableEntry) -> Result<TableOptionsInternal> {
match table.options {
TableOptionsV0::Internal(ref opts) => Ok(opts.clone()),
_ => Err(NativeError::NotNative(table.clone())),
}
}

fn create_delta_store_for_table(&self, table: &TableEntry) -> Arc<dyn LogStore> {
Expand Down Expand Up @@ -498,11 +497,7 @@ mod tests {
use deltalake::protocol::SaveMode;
use object_store_util::conf::StorageConfig;
use protogen::metastore::types::catalog::{EntryMeta, EntryType, SourceAccessMode, TableEntry};
use protogen::metastore::types::options::{
InternalColumnDefinition,
TableOptions,
TableOptionsInternal,
};
use protogen::metastore::types::options::{InternalColumnDefinition, TableOptionsInternal};
use tempfile::tempdir;
use url::Url;
use uuid::Uuid;
Expand Down Expand Up @@ -533,13 +528,14 @@ mod tests {
external: false,
is_temp: false,
},
options: TableOptions::Internal(TableOptionsInternal {
options: TableOptionsInternal {
columns: vec![InternalColumnDefinition {
name: "id".to_string(),
nullable: true,
arrow_type: DataType::Int32,
}],
}),
}
.into(),
tunnel_id: None,
access_mode: SourceAccessMode::ReadOnly,
columns: None,
Expand Down
44 changes: 22 additions & 22 deletions crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use futures::StreamExt;
use glob::{MatchOptions, Pattern};
use object_store::path::Path as ObjectStorePath;
use object_store::{ObjectMeta, ObjectStore};
use protogen::metastore::types::options::{TableOptions, TableOptionsObjectStore};
use protogen::metastore::types::options::{TableOptionsObjectStore, TableOptionsV0};

use self::azure::AzureStoreAccess;
use self::glob_util::{get_resolved_patterns, ResolvedPattern};
Expand Down Expand Up @@ -380,49 +380,49 @@ pub fn file_type_from_path(path: &ObjectStorePath) -> Result<FileType> {

pub fn init_session_registry<'a>(
runtime: &RuntimeEnv,
entries: impl Iterator<Item = &'a TableOptions>,
entries: impl Iterator<Item = &'a TableOptionsV0>,
) -> Result<()> {
for opts in entries {
let access: Arc<dyn ObjStoreAccess> = match opts {
TableOptions::Local(_) => Arc::new(LocalStoreAccess),
TableOptionsV0::Local(_) => Arc::new(LocalStoreAccess),
// TODO: Consider consolidating Gcs, S3 and Delta and Iceberg `TableOptions` and
// `ObjStoreAccess` since they largely overlap
TableOptions::Gcs(opts) => Arc::new(GcsStoreAccess {
TableOptionsV0::Gcs(opts) => Arc::new(GcsStoreAccess {
bucket: opts.bucket.clone(),
service_account_key: opts.service_account_key.clone(),
opts: HashMap::new(),
}),
TableOptions::S3(opts) => Arc::new(S3StoreAccess {
TableOptionsV0::S3(opts) => Arc::new(S3StoreAccess {
bucket: opts.bucket.clone(),
region: Some(opts.region.clone()),
access_key_id: opts.access_key_id.clone(),
secret_access_key: opts.secret_access_key.clone(),
opts: HashMap::new(),
}),
TableOptions::Azure(TableOptionsObjectStore {
TableOptionsV0::Azure(TableOptionsObjectStore {
location,
storage_options,
..
}) => {
let uri = DatasourceUrl::try_new(location)?;
Arc::new(AzureStoreAccess::try_from_uri(&uri, storage_options)?)
}
TableOptions::Delta(TableOptionsObjectStore {
TableOptionsV0::Delta(TableOptionsObjectStore {
location,
storage_options,
..
})
| TableOptions::Iceberg(TableOptionsObjectStore {
| TableOptionsV0::Iceberg(TableOptionsObjectStore {
location,
storage_options,
..
})
| TableOptions::Lance(TableOptionsObjectStore {
| TableOptionsV0::Lance(TableOptionsObjectStore {
location,
storage_options,
..
})
| TableOptions::Bson(TableOptionsObjectStore {
| TableOptionsV0::Bson(TableOptionsObjectStore {
location,
storage_options,
..
Expand All @@ -434,18 +434,18 @@ pub fn init_session_registry<'a>(
// Continue on all others. Explicitly mentioning all the left
// over options so we don't forget adding object stores that are
// supported in the future (like azure).
TableOptions::Internal(_)
| TableOptions::Debug(_)
| TableOptions::Postgres(_)
| TableOptions::BigQuery(_)
| TableOptions::Mysql(_)
| TableOptions::MongoDb(_)
| TableOptions::Snowflake(_)
| TableOptions::SqlServer(_)
| TableOptions::Clickhouse(_)
| TableOptions::Cassandra(_)
| TableOptions::Excel(_)
| TableOptions::Sqlite(_) => continue,
TableOptionsV0::Internal(_)
| TableOptionsV0::Debug(_)
| TableOptionsV0::Postgres(_)
| TableOptionsV0::BigQuery(_)
| TableOptionsV0::Mysql(_)
| TableOptionsV0::MongoDb(_)
| TableOptionsV0::Snowflake(_)
| TableOptionsV0::SqlServer(_)
| TableOptionsV0::Clickhouse(_)
| TableOptionsV0::Cassandra(_)
| TableOptionsV0::Excel(_)
| TableOptionsV0::Sqlite(_) => continue,
};

let base_url = access.base_url()?;
Expand Down
37 changes: 37 additions & 0 deletions crates/glaredb/tests/catalog_compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
mod setup;
use crate::setup::make_cli;


#[test]
/// Assert that we can still load the old catalog without panicking
fn test_catalog_backwards_compat() {
let mut cmd = make_cli();
let pwd = std::env::current_dir().unwrap();
let root_dir = pwd.parent().unwrap().parent().unwrap();
let old_catalog = root_dir.join("testdata/catalog_compat/v0");
cmd.args(["-l", old_catalog.to_str().unwrap()])
.assert()
.success();
}


#[test]
/// Make sure that we can read the table options from the old catalog
/// The v0 catalog has a table created from the following SQL:
/// ```sql
/// CREATE EXTERNAL TABLE debug_table
/// FROM debug OPTIONS (
/// table_type 'never_ending'
/// );
/// ```
fn test_catalog_backwards_compat_tbl_options() {
let mut cmd = make_cli();
let pwd = std::env::current_dir().unwrap();
let root_dir = pwd.parent().unwrap().parent().unwrap();
let old_catalog = root_dir.join("testdata/catalog_compat/v0");

let query = "SELECT * FROM debug_table LIMIT 1";
cmd.args(["-l", old_catalog.to_str().unwrap(), "-q", query])
.assert()
.success();
}
1 change: 0 additions & 1 deletion crates/metastore/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ impl StatefulWorker {
.map_err(CatalogError::from),
Err(e) => Err(CatalogError::new(e.to_string())),
};

let result = match result {
Ok(resp) => {
let resp = resp.into_inner();
Expand Down
Loading