Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: table options #2767

Merged
merged 47 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
1cd4172
refactor: move parser into own crate
universalmind303 Mar 6, 2024
e154cbf
refactor: move parser into own crate
universalmind303 Mar 6, 2024
b80aff8
Merge branch 'main' into universalmind303/extract-parser
universalmind303 Mar 6, 2024
1a2a931
remove main
universalmind303 Mar 6, 2024
fec18cc
Merge branch 'universalmind303/extract-parser' of github.com:GlareDB/…
universalmind303 Mar 6, 2024
e9ecd2a
cleanup
universalmind303 Mar 6, 2024
2849edc
pr feedback
universalmind303 Mar 6, 2024
04c88c8
refactor: remove proptests (#2748)
universalmind303 Mar 6, 2024
932884e
wip: table options refactor
universalmind303 Mar 7, 2024
2652d61
wip
universalmind303 Mar 7, 2024
bb0ce70
wip
universalmind303 Mar 8, 2024
d3f03c4
wip
universalmind303 Mar 8, 2024
4335900
i think first pass is done
universalmind303 Mar 8, 2024
ace5fc1
wip
universalmind303 Mar 8, 2024
0f19436
get all tests passing, TODO: external schema
universalmind303 Mar 8, 2024
2502a61
add some docstrings
universalmind303 Mar 11, 2024
e85f623
remove validation from datasource
universalmind303 Mar 11, 2024
305a74f
fmt
universalmind303 Mar 11, 2024
d3a4c7c
fix the "columns" stuff
universalmind303 Mar 11, 2024
7ca5256
refactor(table options): serde (#2770)
universalmind303 Mar 12, 2024
5052e92
Merge branch 'universalmind303/table-options-refactor' of github.com:…
universalmind303 Mar 12, 2024
6e8d7c0
Merge branch 'main' of github.com:GlareDB/glaredb into universalmind3…
universalmind303 Mar 12, 2024
c46933d
pr feedback
universalmind303 Mar 12, 2024
c056a0e
get schema stuff working
universalmind303 Mar 12, 2024
4935cc3
refactor(table options): backwards compat for catalog (#2775)
universalmind303 Mar 12, 2024
bc17867
fmt
universalmind303 Mar 12, 2024
d9baec8
Merge branch 'universalmind303/table-options-refactor' of github.com:…
universalmind303 Mar 12, 2024
ae2c21f
wip
universalmind303 Mar 14, 2024
d3b03be
remove "datasource" stuff
universalmind303 Mar 14, 2024
691980e
revert more code
universalmind303 Mar 14, 2024
0e42b84
revert more code
universalmind303 Mar 14, 2024
8a62643
revert more code
universalmind303 Mar 14, 2024
2c178e0
clippy
universalmind303 Mar 14, 2024
ddf88ce
cleanup
universalmind303 Mar 14, 2024
918b94e
Merge branch 'main' into universalmind303/table-options-refactor
universalmind303 Mar 14, 2024
49494ca
cleanup
universalmind303 Mar 14, 2024
1fb7a26
revert more code
universalmind303 Mar 14, 2024
b243e41
Merge branch 'main' into universalmind303/table-options-refactor
universalmind303 Mar 14, 2024
5604c58
revert more code
universalmind303 Mar 14, 2024
93ea929
Merge branch 'universalmind303/table-options-refactor' of github.com:…
universalmind303 Mar 14, 2024
b7b6a58
tbl opts
universalmind303 Mar 14, 2024
45cf0dc
fixes
universalmind303 Mar 14, 2024
eefa93e
Merge branch 'main' of github.com:GlareDB/glaredb into universalmind3…
universalmind303 Mar 15, 2024
0513e84
pr feedback
universalmind303 Mar 15, 2024
9890312
pr feedback
universalmind303 Mar 15, 2024
c72af53
clippy
universalmind303 Mar 15, 2024
3bf7d5f
merge from main
universalmind303 Mar 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 7 additions & 12 deletions crates/catalog/src/session_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ use protogen::metastore::types::catalog::{
TableEntry,
TunnelEntry,
};
use protogen::metastore::types::options::{
InternalColumnDefinition,
TableOptions,
TableOptionsInternal,
};
use protogen::metastore::types::options::{InternalColumnDefinition, TableOptionsInternal};
use tracing::debug;

use super::client::MetastoreClientHandle;
Expand Down Expand Up @@ -461,12 +457,10 @@ impl TempCatalog {
external: false,
is_temp: true,
},
options: TableOptions::Internal(TableOptionsInternal {
columns: columns.to_owned(),
}),
options: TableOptionsInternal { columns }.into(),
tunnel_id: None,
access_mode: SourceAccessMode::ReadWrite,
columns: Some(columns.to_owned()),
schema: Some(schema.as_ref().clone()),
}
})
}
Expand Down Expand Up @@ -505,12 +499,13 @@ impl TempCatalog {
external: false,
is_temp: true,
},
options: TableOptions::Internal(TableOptionsInternal {
options: TableOptionsInternal {
columns: Vec::new(),
}),
}
.into(),
tunnel_id: None,
access_mode: SourceAccessMode::ReadWrite,
columns: Some(Vec::new()),
schema: None,
});
}

Expand Down
10 changes: 5 additions & 5 deletions crates/datasources/src/bson/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use bson::RawDocumentBuf;
use bytes::BytesMut;
use datafusion::arrow::datatypes::{FieldRef, Schema};
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::TableProvider;
use datafusion::parquet::data_type::AsBytes;
Expand All @@ -20,12 +20,12 @@ use crate::object_store::{ObjStoreAccess, ObjStoreAccessor};
pub async fn bson_streaming_table(
store_access: Arc<dyn ObjStoreAccess>,
source_url: DatasourceUrl,
fields: Option<Vec<FieldRef>>,
schema: Option<Schema>,
schema_inference_sample_size: Option<i64>,
) -> Result<Arc<dyn TableProvider>, BsonError> {
// TODO: set a maximum (1024?) or have an adaptive mode
// (at least n but stop after n the same) or skip documents
let sample_size = if fields.is_some() {
let sample_size = if schema.is_some() {
0
} else {
schema_inference_sample_size.unwrap_or(100)
Expand Down Expand Up @@ -93,8 +93,8 @@ pub async fn bson_streaming_table(
let mut streams = Vec::<Arc<(dyn PartitionStream + 'static)>>::with_capacity(readers.len() + 1);

// get the schema; if provided as an argument, just use that, otherwise, sample.
let schema = if let Some(fields) = fields {
Arc::new(Schema::new(fields))
let schema = if let Some(schema) = schema {
Arc::new(schema)
} else {
// iterate through the readers and build up a sample of the first <n>
// documents to be used to infer the schema.
Expand Down
1 change: 1 addition & 0 deletions crates/datasources/src/debug/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ pub enum DebugError {
InvalidTunnel(String),
}


pub type Result<T, E = DebugError> = std::result::Result<T, E>;
182 changes: 50 additions & 132 deletions crates/datasources/src/debug/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
//! A collection of debug datasources.
pub mod errors;
pub mod options;

use std::any::Any;
use std::fmt;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};

use async_trait::async_trait;
use datafusion::arrow::array::Int32Array;
use datafusion::arrow::datatypes::{
DataType,
Field,
Fields,
Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef,
};
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::datatypes::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result as DatafusionResult};
Expand All @@ -37,52 +29,12 @@ use datafusion_ext::errors::ExtensionError;
use datafusion_ext::functions::VirtualLister;
use errors::DebugError;
use futures::Stream;
use parser::errors::ParserError;
use parser::options::{OptionValue, ParseOptionValue};
use protogen::metastore::types::options::TunnelOptions;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DebugTableType {
/// A table that will always return an error on the record batch stream.
ErrorDuringExecution,
/// A table that never stops sending record batches.
NeverEnding,
}

impl ParseOptionValue<DebugTableType> for OptionValue {
fn parse_opt(self) -> Result<DebugTableType, ParserError> {
let opt = match self {
Self::QuotedLiteral(s) | Self::UnquotedLiteral(s) => s
.parse()
.map_err(|e: DebugError| ParserError::ParserError(e.to_string()))?,
o => {
return Err(ParserError::ParserError(format!(
"Expected a string, got: {}",
o
)))
}
};
Ok(opt)
}
}
impl fmt::Display for DebugTableType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
use parser::options::StatementOptions;
use protogen::metastore::types::options::{CredentialsOptions, TableOptions, TunnelOptions};

impl FromStr for DebugTableType {
type Err = DebugError;
pub use self::options::{DebugTableType, TableOptionsDebug};
use crate::DatasourceError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"error_during_execution" => DebugTableType::ErrorDuringExecution,
"never_ending" => DebugTableType::NeverEnding,
other => return Err(DebugError::UnknownDebugTableType(other.to_string())),
})
}
}

/// Validates if the tunnel is supported and returns whether a tunnel is going
/// to be used or not for the connection.
Expand All @@ -96,82 +48,6 @@ pub fn validate_tunnel_connections(
}
}

impl DebugTableType {
/// Get the arrow schema for the debug table type.
pub fn arrow_schema(&self) -> ArrowSchema {
match self {
DebugTableType::ErrorDuringExecution => {
ArrowSchema::new(vec![Field::new("a", DataType::Int32, false)])
}
DebugTableType::NeverEnding => ArrowSchema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]),
}
}

/// Get the projected arrow schema.
pub fn projected_arrow_schema(
&self,
projection: Option<&Vec<usize>>,
) -> ArrowResult<ArrowSchema> {
match projection {
Some(proj) => self.arrow_schema().project(proj),
None => Ok(self.arrow_schema()),
}
}

/// Produces a record batch that matches this debug table's schema.
pub fn record_batch(&self, tunnel: bool) -> RecordBatch {
let base = if tunnel { 10_i32 } else { 1_i32 };
match self {
DebugTableType::ErrorDuringExecution => RecordBatch::try_new(
Arc::new(self.arrow_schema()),
vec![Arc::new(Int32Array::from_value(base, 30))],
)
.unwrap(),
DebugTableType::NeverEnding => RecordBatch::try_new(
Arc::new(self.arrow_schema()),
vec![
Arc::new(Int32Array::from_value(base, 30)),
Arc::new(Int32Array::from_value(base * 2, 30)),
Arc::new(Int32Array::from_value(base * 3, 30)),
],
)
.unwrap(),
}
}

/// Get a projected record batch for this debug table type.
pub fn projected_record_batch(
&self,
tunnel: bool,
projection: Option<&Vec<usize>>,
) -> ArrowResult<RecordBatch> {
match projection {
Some(proj) => self.record_batch(tunnel).project(proj),
None => Ok(self.record_batch(tunnel)),
}
}

pub fn as_str(&self) -> &'static str {
match self {
DebugTableType::ErrorDuringExecution => "error_during_execution",
DebugTableType::NeverEnding => "never_ending",
}
}

pub fn into_table_provider(
self,
tunnel_opts: Option<&TunnelOptions>,
) -> Arc<dyn TableProvider> {
let tunnel = validate_tunnel_connections(tunnel_opts)
.expect("datasources should be validated with tunnels before dispatch");
Arc::new(DebugTableProvider { typ: self, tunnel })
}
}

pub struct DebugVirtualLister;

#[async_trait]
Expand Down Expand Up @@ -202,8 +78,8 @@ impl VirtualLister for DebugVirtualLister {
}

pub struct DebugTableProvider {
typ: DebugTableType,
tunnel: bool,
pub typ: DebugTableType,
pub tunnel: bool,
}

#[async_trait]
Expand Down Expand Up @@ -369,3 +245,45 @@ impl RecordBatchStream for NeverEndingStream {
self.batch.schema()
}
}
pub struct DebugDatasource;


#[async_trait]
impl crate::Datasource for DebugDatasource {
fn name(&self) -> &'static str {
"debug"
}

fn table_options_from_stmt(
&self,
opts: &mut StatementOptions,
creds: Option<CredentialsOptions>,
tunnel_opts: Option<TunnelOptions>,
) -> Result<TableOptions, DatasourceError> {
validate_tunnel_connections(tunnel_opts.as_ref()).unwrap();

let typ: Option<DebugTableType> = match creds {
Some(CredentialsOptions::Debug(c)) => c.table_type.parse().ok(),
Some(other) => unreachable!("invalid credentials {other} for debug datasource"),
None => None,
};
let typ: DebugTableType = opts
.remove_required_or("table_type", typ)
.map_err(|e| DebugError::UnknownDebugTableType(e.to_string()))?;

Ok(TableOptionsDebug { table_type: typ }.into())
}

async fn create_table_provider(
&self,
options: &TableOptions,
tunnel_opts: Option<&TunnelOptions>,
) -> Result<Arc<dyn TableProvider>, DatasourceError> {
let options: TableOptionsDebug = options.extract()?;

Ok(Arc::new(DebugTableProvider {
typ: options.table_type,
tunnel: tunnel_opts.is_some(),
}))
}
}
Loading
Loading