diff --git a/Cargo.lock b/Cargo.lock index 59c933d9e..30518a02f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2436,6 +2436,7 @@ dependencies = [ "env_logger", "futures", "ioutil", + "object_store", "once_cell", "parking_lot", "paste", @@ -7873,6 +7874,7 @@ dependencies = [ "siphasher 1.0.0", "strum 0.25.0", "telemetry", + "tempfile", "thiserror", "tokio", "tracing", diff --git a/crates/datafusion_ext/Cargo.toml b/crates/datafusion_ext/Cargo.toml index 7110b415a..d9b738d8d 100644 --- a/crates/datafusion_ext/Cargo.toml +++ b/crates/datafusion_ext/Cargo.toml @@ -15,19 +15,20 @@ unicode_expressions = [] ioutil = { path = "../ioutil" } telemetry = { path = "../telemetry" } catalog = { path = "../catalog" } +decimal = { path = "../decimal" } +protogen = { path = "../protogen" } +pgrepr = { path = "../pgrepr" } serde_json = { workspace = true } datafusion = { workspace = true } async-trait = { workspace = true } +object_store = { workspace = true } +tracing = { workspace = true } +thiserror.workspace = true +futures = { workspace = true } async-recursion = "1.0.4" uuid = { version = "1.7.0", features = ["v4", "fast-rng", "macro-diagnostics"] } regex = "1.10" once_cell = "1.19.0" -tracing = { workspace = true } -thiserror.workspace = true -decimal = { path = "../decimal" } -protogen = { path = "../protogen" } -pgrepr = { path = "../pgrepr" } -futures = { workspace = true } parking_lot = "0.12.1" bson = "2.9.0" diff --git a/crates/datafusion_ext/src/errors.rs b/crates/datafusion_ext/src/errors.rs index 8d833838f..c294d48a4 100644 --- a/crates/datafusion_ext/src/errors.rs +++ b/crates/datafusion_ext/src/errors.rs @@ -42,6 +42,12 @@ pub enum ExtensionError { #[error("object store: {0}")] ObjectStore(String), + #[error(transparent)] + ObjectStoreCrate(#[from] object_store::Error), + + #[error(transparent)] + ObjectStorePath(#[from] object_store::path::Error), + #[error("{0}")] String(String), } diff --git a/crates/datasources/src/common/errors.rs b/crates/datasources/src/common/errors.rs index 47aebe2f0..bac408fe0 100644 --- a/crates/datasources/src/common/errors.rs +++ b/crates/datasources/src/common/errors.rs @@ -35,3 +35,9 @@ pub enum DatasourceCommonError { } pub type Result = std::result::Result; + +impl From for datafusion_ext::errors::ExtensionError { + fn from(value: DatasourceCommonError) -> Self { + datafusion_ext::errors::ExtensionError::access(value) + } +} diff --git a/crates/datasources/src/common/url.rs b/crates/datasources/src/common/url.rs index c7b27dbce..a9b841b57 100644 --- a/crates/datasources/src/common/url.rs +++ b/crates/datasources/src/common/url.rs @@ -149,6 +149,7 @@ impl TryFrom<&str> for DatasourceUrl { } } + impl TryFrom for DatasourceUrl { type Error = ExtensionError; @@ -161,6 +162,12 @@ impl TryFrom for DatasourceUrl { } } +impl From for DatasourceUrl { + fn from(path: PathBuf) -> Self { + DatasourceUrl::File(path) + } +} + impl TryFrom for ObjectStoreUrl { type Error = DataFusionError; diff --git a/crates/datasources/src/json/errors.rs b/crates/datasources/src/json/errors.rs index df3363b71..e04fe820f 100644 --- a/crates/datasources/src/json/errors.rs +++ b/crates/datasources/src/json/errors.rs @@ -1,8 +1,6 @@ use datafusion::error::DataFusionError; use datafusion_ext::errors::ExtensionError; -use crate::object_store::errors::ObjectStoreSourceError; - #[derive(Debug, thiserror::Error)] pub enum JsonError { #[error("Unsupported json type: {0}")] @@ -18,7 +16,7 @@ pub enum JsonError { SendAlreadyInProgress, #[error(transparent)] - ObjectStoreSource(#[from] ObjectStoreSourceError), + ObjectStoreSource(#[from] crate::object_store::errors::ObjectStoreSourceError), #[error(transparent)] ObjectStore(#[from] object_store::Error), diff --git a/crates/datasources/src/sqlite/errors.rs b/crates/datasources/src/sqlite/errors.rs index 16d0ed54d..bc7449f46 100644 --- a/crates/datasources/src/sqlite/errors.rs +++ b/crates/datasources/src/sqlite/errors.rs @@ -14,10 +14,13 @@ pub enum SqliteError { MpscSendError(String), #[error(transparent)] - FmtError(#[from] std::fmt::Error), + Fmt(#[from] std::fmt::Error), #[error(transparent)] - DatasourceCommonError(#[from] crate::common::errors::DatasourceCommonError), + DatasourceCommon(#[from] crate::common::errors::DatasourceCommonError), + + #[error("Unimplemented: {0}")] + Unimplemented(&'static str), #[error("Missing data for column {0}")] MissingDataForColumn(usize), @@ -28,11 +31,41 @@ pub enum SqliteError { to: datafusion::arrow::datatypes::DataType, }, + #[error("found {num} objects matching specification '{url}'")] + NoMatchingObjectFound { + url: crate::common::url::DatasourceUrl, + num: usize, + }, + #[error(transparent)] ArrowError(#[from] datafusion::arrow::error::ArrowError), #[error(transparent)] ReprError(#[from] repr::error::ReprError), + + #[error(transparent)] + IoError(#[from] std::io::Error), + + #[error(transparent)] + ObjectStoreSource(#[from] crate::object_store::errors::ObjectStoreSourceError), + + #[error(transparent)] + ObjectStoreError(#[from] object_store::Error), + + #[error(transparent)] + ObjectStorePath(#[from] object_store::path::Error), + + #[error(transparent)] + LakeStorageOptions(#[from] crate::lake::LakeStorageOptionsError), + + #[error(transparent)] + ExtensionError(#[from] datafusion_ext::errors::ExtensionError), } pub type Result = std::result::Result; + +impl From for datafusion_ext::errors::ExtensionError { + fn from(value: SqliteError) -> Self { + datafusion_ext::errors::ExtensionError::access(value) + } +} diff --git a/crates/datasources/src/sqlite/mod.rs b/crates/datasources/src/sqlite/mod.rs index 1bfddab56..b44111902 100644 --- a/crates/datasources/src/sqlite/mod.rs +++ b/crates/datasources/src/sqlite/mod.rs @@ -34,21 +34,101 @@ use datafusion_ext::errors::ExtensionError; use datafusion_ext::functions::VirtualLister; use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; use futures::{StreamExt, TryStreamExt}; +use object_store::ObjectStore; +use protogen::metastore::types::options::StorageOptions; +use uuid::Uuid; -use self::errors::Result; +use self::errors::{Result, SqliteError}; use self::wrapper::SqliteAsyncClient; +use crate::common::url::DatasourceUrl; use crate::common::util::{self, COUNT_SCHEMA}; +use crate::lake::storage_options_into_store_access; +use crate::object_store::ObjStoreAccessor; type DataFusionResult = Result; #[derive(Debug, Clone)] pub struct SqliteAccess { pub db: PathBuf, + pub cache: Option>, } impl SqliteAccess { + pub async fn new(url: DatasourceUrl, opts: Option) -> Result { + match url { + DatasourceUrl::File(ref location) => { + if !location.try_exists()? { + Err(SqliteError::NoMatchingObjectFound { + url: url.clone(), + num: 0, + }) + } else { + Ok(Self { + db: location.clone(), + cache: None, + }) + } + } + DatasourceUrl::Url(_) => { + let storage_options = match opts { + Some(v) => v, + None => { + return Err(SqliteError::Internal( + "storage options are required".to_string(), + )) + } + }; + let store_access = storage_options_into_store_access(&url, &storage_options)?; + + let accessor = ObjStoreAccessor::new(store_access)?; + let mut list = accessor.list_globbed(url.path()).await?; + if list.len() != 1 { + return Err(SqliteError::NoMatchingObjectFound { + url, + num: list.len(), + }); + } + + let store = accessor.into_object_store(); + + let obj = list.pop().unwrap().location; + let payload = store.get(&obj).await?.bytes().await?; + + let tmpdir = Arc::new( + tempfile::Builder::new() + .prefix( + storage_options + .inner + .get("__tmp_prefix") + .map(|i| i.to_owned()) + .unwrap_or_else(|| Uuid::new_v4().to_string()) + .as_str(), + ) + .rand_bytes(8) + .tempdir()?, + ); + + let tmpdir_path = tmpdir.path(); + let local_store = + object_store::local::LocalFileSystem::new_with_prefix(tmpdir_path)?; + + let local_path = + object_store::path::Path::parse(obj.filename().unwrap_or("sqlite"))?; + + local_store.put(&local_path, payload).await?; + + let db = tmpdir_path.join(local_path.filename().unwrap()); + + Ok(Self { + db, + cache: Some(tmpdir.clone()), + }) + } + } + } + pub async fn connect(&self) -> Result { - let client = SqliteAsyncClient::new(self.db.to_path_buf()).await?; + let client = SqliteAsyncClient::new(self.db.to_path_buf(), self.cache.clone()).await?; Ok(SqliteAccessState { client }) } @@ -64,12 +144,17 @@ impl SqliteAccess { } } + #[derive(Clone, Debug)] pub struct SqliteAccessState { client: SqliteAsyncClient, } impl SqliteAccessState { + pub fn is_local_file(&self) -> bool { + self.client.is_local_file() + } + async fn validate_table_access(&self, table: &str) -> Result<()> { let query = format!("SELECT * FROM {table} WHERE FALSE"); let _ = self.client.query_all(query).await?; @@ -301,6 +386,12 @@ impl TableProvider for SqliteTableProvider { return Err(DataFusionError::Execution("cannot overwrite".to_string())); } + if !self.state.is_local_file() { + return Err(DataFusionError::Execution( + "cannot write remote file".to_string(), + )); + } + Ok(Arc::new(SqliteInsertExec { input, table: self.table.to_string(), diff --git a/crates/datasources/src/sqlite/wrapper.rs b/crates/datasources/src/sqlite/wrapper.rs index dc000b846..1b234cd2a 100644 --- a/crates/datasources/src/sqlite/wrapper.rs +++ b/crates/datasources/src/sqlite/wrapper.rs @@ -1,6 +1,7 @@ use std::fmt; use std::path::PathBuf; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use async_sqlite::rusqlite; @@ -10,6 +11,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::DataFusionError; use datafusion::physical_plan::RecordBatchStream; use futures::{Future, FutureExt, Stream}; +use tempfile; use tokio::sync::mpsc; use super::convert::Converter; @@ -19,6 +21,8 @@ use crate::sqlite::errors::Result; pub struct SqliteAsyncClient { path: PathBuf, inner: async_sqlite::Client, + // we're just tying the lifetime of the tempdir to this connection + cache: Option>, } impl fmt::Debug for SqliteAsyncClient { @@ -28,12 +32,13 @@ impl fmt::Debug for SqliteAsyncClient { } impl SqliteAsyncClient { - pub async fn new(path: PathBuf) -> Result { + pub async fn new(path: PathBuf, cache: Option>) -> Result { let inner = async_sqlite::ClientBuilder::new() .path(&path) .open() .await?; - Ok(Self { path, inner }) + + Ok(Self { path, inner, cache }) } /// Query and return a RecordBatchStream for sqlite data. @@ -118,6 +123,10 @@ impl SqliteAsyncClient { }) .await?) } + + pub fn is_local_file(&self) -> bool { + self.cache.is_none() + } } #[derive(Debug, Clone)] diff --git a/crates/protogen/proto/metastore/options.proto b/crates/protogen/proto/metastore/options.proto index 7940af265..95a971f33 100644 --- a/crates/protogen/proto/metastore/options.proto +++ b/crates/protogen/proto/metastore/options.proto @@ -50,7 +50,7 @@ message DatabaseOptions { DatabaseOptionsCassandra cassandra = 11; DatabaseOptionsSqlite sqlite = 12; } - // next: 12 + // next: 13 } message DatabaseOptionsInternal {} @@ -79,6 +79,11 @@ message DatabaseOptionsSqlServer { string connection_string = 1; } +message DatabaseOptionsSqlite { + string location = 1; + StorageOptions storage_options = 2; +} + message DatabaseOptionsClickhouse { string connection_string = 1; } @@ -89,10 +94,6 @@ message DatabaseOptionsCassandra { optional string password = 3; } -message DatabaseOptionsSqlite { - string location = 1; -} - message DatabaseOptionsSnowflake { string account_name = 1; string login_name = 2; @@ -143,7 +144,7 @@ message TableOptions { TableOptionsClickhouse clickhouse = 17; TableOptionsCassandra cassandra = 18; TableOptionsExcel excel = 19; - TableOptionsSqlite sqlite = 20; + TableOptionsObjectStore sqlite = 20; } // next: 21 } @@ -245,6 +246,8 @@ message TableOptionsObjectStore { // (optional) explit schema repeated InternalColumnDefinition columns = 6; + + optional string name = 7; } message TableOptionsSqlServer { @@ -267,11 +270,6 @@ message TableOptionsCassandra { optional string password = 5; } -message TableOptionsSqlite { - string location = 1; - string table = 2; -} - // Tunnel options message TunnelOptions { diff --git a/crates/protogen/src/metastore/types/options.rs b/crates/protogen/src/metastore/types/options.rs index 678b42f45..a91e5ebd4 100644 --- a/crates/protogen/src/metastore/types/options.rs +++ b/crates/protogen/src/metastore/types/options.rs @@ -424,28 +424,6 @@ impl From for options::DatabaseOptionsCassandra { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DatabaseOptionsSqlite { - pub location: String, -} - -impl TryFrom for DatabaseOptionsSqlite { - type Error = ProtoConvError; - fn try_from(value: options::DatabaseOptionsSqlite) -> Result { - Ok(DatabaseOptionsSqlite { - location: value.location, - }) - } -} - -impl From for options::DatabaseOptionsSqlite { - fn from(value: DatabaseOptionsSqlite) -> Self { - options::DatabaseOptionsSqlite { - location: value.location, - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct DatabaseOptionsSnowflake { pub account_name: String, @@ -595,10 +573,9 @@ impl StorageOptions { } } -impl TryFrom for StorageOptions { - type Error = ProtoConvError; - fn try_from(value: options::StorageOptions) -> Result { - Ok(StorageOptions { inner: value.inner }) +impl From for StorageOptions { + fn from(value: options::StorageOptions) -> Self { + StorageOptions { inner: value.inner } } } @@ -608,6 +585,7 @@ impl From for options::StorageOptions { } } + // Table options #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -631,7 +609,7 @@ pub enum TableOptions { Clickhouse(TableOptionsClickhouse), Cassandra(TableOptionsCassandra), Excel(TableOptionsExcel), - Sqlite(TableOptionsSqlite), + Sqlite(TableOptionsObjectStore), } impl TableOptions { @@ -1226,31 +1204,6 @@ impl From for options::TableOptionsCassandra { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct TableOptionsSqlite { - pub location: String, - pub table: String, -} - -impl TryFrom for TableOptionsSqlite { - type Error = ProtoConvError; - fn try_from(value: options::TableOptionsSqlite) -> Result { - Ok(TableOptionsSqlite { - location: value.location, - table: value.table, - }) - } -} - -impl From for options::TableOptionsSqlite { - fn from(value: TableOptionsSqlite) -> Self { - options::TableOptionsSqlite { - location: value.location, - table: value.table, - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct TableOptionsSnowflake { pub account_name: String, @@ -1294,9 +1247,36 @@ impl From for options::TableOptionsSnowflake { } } + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct DatabaseOptionsSqlite { + pub location: String, + pub storage_options: Option, +} + +impl TryFrom for DatabaseOptionsSqlite { + type Error = ProtoConvError; + fn try_from(value: options::DatabaseOptionsSqlite) -> Result { + Ok(DatabaseOptionsSqlite { + location: value.location, + storage_options: value.storage_options.map(|v| v.into()), + }) + } +} + +impl From for options::DatabaseOptionsSqlite { + fn from(value: DatabaseOptionsSqlite) -> Self { + options::DatabaseOptionsSqlite { + location: value.location, + storage_options: value.storage_options.map(|v| v.into()), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct TableOptionsObjectStore { pub location: String, + pub name: Option, pub storage_options: StorageOptions, pub file_type: Option, pub compression: Option, @@ -1308,6 +1288,7 @@ impl TryFrom for TableOptionsObjectStore { type Error = ProtoConvError; fn try_from(value: options::TableOptionsObjectStore) -> Result { Ok(TableOptionsObjectStore { + name: value.name, location: value.location, storage_options: value.storage_options.required("storage_options")?, file_type: value.file_type, @@ -1330,6 +1311,7 @@ impl TryFrom for TableOptionsObjectStore { impl From for options::TableOptionsObjectStore { fn from(value: TableOptionsObjectStore) -> Self { options::TableOptionsObjectStore { + name: value.name, location: value.location, storage_options: Some(value.storage_options.into()), file_type: value.file_type, diff --git a/crates/sqlbuiltins/Cargo.toml b/crates/sqlbuiltins/Cargo.toml index c95a1a615..d0cdc0748 100644 --- a/crates/sqlbuiltins/Cargo.toml +++ b/crates/sqlbuiltins/Cargo.toml @@ -20,8 +20,9 @@ async-trait = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } object_store = { workspace = true } -thiserror.workspace = true +thiserror = { workspace = true } tracing = { workspace = true } +tempfile = { workspace = true } tokio = { workspace = true } reqwest = { workspace = true } async-openai = "0.19.1" diff --git a/crates/sqlbuiltins/src/functions/table/sqlite.rs b/crates/sqlbuiltins/src/functions/table/sqlite.rs index 682528d37..85ca8d3c6 100644 --- a/crates/sqlbuiltins/src/functions/table/sqlite.rs +++ b/crates/sqlbuiltins/src/functions/table/sqlite.rs @@ -7,10 +7,11 @@ use datafusion::datasource::TableProvider; use datafusion::logical_expr::{Signature, Volatility}; use datafusion_ext::errors::{ExtensionError, Result}; use datafusion_ext::functions::{FuncParamValue, IdentValue, TableFuncContextProvider}; +use datasources::common::url::DatasourceUrl; use datasources::sqlite::{SqliteAccess, SqliteTableProvider}; use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; -use super::TableFunc; +use super::{table_location_and_opts, TableFunc}; use crate::functions::ConstBuiltinFunction; #[derive(Debug, Clone, Copy)] @@ -42,26 +43,47 @@ impl TableFunc for ReadSqlite { async fn create_provider( &self, - _: &dyn TableFuncContextProvider, - args: Vec, - _opts: HashMap, + ctx: &dyn TableFuncContextProvider, + mut args: Vec, + mut opts: HashMap, ) -> Result> { + // NOTE: the semantics of this are read_sqlite(, + // ) when there are options, the table name is still + // last. This "feels" wrong to me, but + // also feels wrong. Putting the table name first feels more + // consistent between the options, but would require breaking + // the original API. match args.len() { - 2 => { - let mut args = args.into_iter(); - let location: String = args.next().unwrap().try_into()?; - let table: IdentValue = args.next().unwrap().try_into()?; + 0 | 1 => Err(ExtensionError::InvalidNumArgs), + 2 | 3 => { + let table: IdentValue = args.pop().unwrap().try_into()?; + let (source_url, mut storage_options) = + table_location_and_opts(ctx, args, &mut opts)?; + let opts = match source_url.clone() { + DatasourceUrl::File(_) => None, + DatasourceUrl::Url(_) => { + let session = ctx.get_session_vars(); + storage_options.inner.insert( + "__tmp_prefix".to_string(), + [ + // TODO this path is too long + session.user_name().as_str(), + &session.database_name(), + &session.connection_id().to_string(), + ] + .join("") + .to_string(), + ); - let access = SqliteAccess { - db: location.into(), + Some(storage_options) + } }; - let state = access.connect().await.map_err(ExtensionError::access)?; - let provider = SqliteTableProvider::try_new(state, table) - .await - .map_err(ExtensionError::access)?; - Ok(Arc::new(provider)) + + let state = SqliteAccess::new(source_url, opts).await?.connect().await?; + + Ok(Arc::new(SqliteTableProvider::try_new(state, table).await?)) } - _ => Err(ExtensionError::InvalidNumArgs), + _ => Err(ExtensionError::String("invalid number of args".to_string())), } } } diff --git a/crates/sqlbuiltins/src/functions/table/virtual_listing.rs b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs index 35b7de337..dfaef598e 100644 --- a/crates/sqlbuiltins/src/functions/table/virtual_listing.rs +++ b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs @@ -37,6 +37,7 @@ use protogen::metastore::types::options::{ DatabaseOptionsSqlServer, DatabaseOptionsSqlite, }; +use uuid::Uuid; use super::TableFunc; use crate::functions::ConstBuiltinFunction; @@ -384,11 +385,22 @@ pub(crate) async fn get_virtual_lister_for_external_db( .map_err(ExtensionError::access)?; Box::new(state) } - DatabaseOptions::Sqlite(DatabaseOptionsSqlite { location }) => { - let access = SqliteAccess { - db: location.into(), + DatabaseOptions::Sqlite(DatabaseOptionsSqlite { + location, + storage_options, + }) => { + let storage_options = match storage_options.clone() { + Some(mut opts) => { + opts.inner + .insert("__tmp_prefix".to_string(), Uuid::new_v4().to_string()); + Some(opts) + } + None => None, }; - let state = access.connect().await.map_err(ExtensionError::access)?; + let state = SqliteAccess::new(location.as_str().try_into()?, storage_options.clone()) + .await? + .connect() + .await?; Box::new(state) } DatabaseOptions::Delta(_) => { diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index 4cc0f516a..300071299 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -68,11 +68,11 @@ use protogen::metastore::types::options::{ TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, - TableOptionsSqlite, TunnelOptions, }; use sqlbuiltins::builtins::DEFAULT_CATALOG; use sqlbuiltins::functions::FunctionRegistry; +use uuid::Uuid; use super::{DispatchError, Result}; @@ -278,11 +278,15 @@ impl<'a> ExternalDispatcher<'a> { .await?; Ok(Arc::new(table)) } - DatabaseOptions::Sqlite(DatabaseOptionsSqlite { location }) => { - let access = SqliteAccess { - db: location.into(), - }; - let state = access.connect().await?; + DatabaseOptions::Sqlite(DatabaseOptionsSqlite { + location, + storage_options, + }) => { + let state = + SqliteAccess::new(location.as_str().try_into()?, storage_options.to_owned()) + .await? + .connect() + .await?; let table = SqliteTableProvider::try_new(state, name).await?; Ok(Arc::new(table)) } @@ -605,13 +609,26 @@ impl<'a> ExternalDispatcher<'a> { Ok(Arc::new(table)) } - TableOptions::Sqlite(TableOptionsSqlite { location, table }) => { - let access = SqliteAccess { - db: location.into(), - }; - let state = access.connect().await?; - let table = SqliteTableProvider::try_new(state, table).await?; - Ok(Arc::new(table)) + TableOptions::Sqlite(TableOptionsObjectStore { + location, + storage_options, + name, + .. + }) => { + let mut storage_options = storage_options.to_owned(); + + storage_options + .inner + .insert("__tmp_prefix".to_string(), Uuid::new_v4().to_string()); + + let table = name.clone().ok_or(DispatchError::MissingTable)?; + let state = + SqliteAccess::new(location.as_str().try_into()?, Some(storage_options.clone())) + .await? + .connect() + .await?; + + Ok(Arc::new(SqliteTableProvider::try_new(state, table).await?)) } } } diff --git a/crates/sqlexec/src/dispatch/mod.rs b/crates/sqlexec/src/dispatch/mod.rs index c82d9978e..8817cb6d9 100644 --- a/crates/sqlexec/src/dispatch/mod.rs +++ b/crates/sqlexec/src/dispatch/mod.rs @@ -38,6 +38,9 @@ pub enum DispatchError { #[error("Missing temp table: {name}")] MissingTempTable { name: String }, + #[error("Missing table")] + MissingTable, + #[error("Missing object with oid: {0}")] MissingObjectWithOid(u32), diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index ea06c84ba..f59b9df4c 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::str::FromStr; use std::sync::Arc; @@ -39,7 +39,6 @@ use datasources::object_store::s3::S3StoreAccess; use datasources::object_store::{file_type_from_path, ObjStoreAccess, ObjStoreAccessor}; use datasources::postgres::{PostgresAccess, PostgresDbConnection}; use datasources::snowflake::{SnowflakeAccessor, SnowflakeDbConnection, SnowflakeTableAccess}; -use datasources::sqlite::SqliteAccess; use datasources::sqlserver::SqlServerAccess; use object_store::aws::AmazonS3ConfigKey; use object_store::azure::AzureConfigKey; @@ -121,7 +120,6 @@ use protogen::metastore::types::options::{ TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, - TableOptionsSqlite, TunnelOptions, TunnelOptionsDebug, TunnelOptionsInternal, @@ -431,13 +429,15 @@ impl<'a> SessionPlanner<'a> { } DatabaseOptions::SQLITE => { let location: String = m.remove_required("location")?; + let mut storage_options = StorageOptions::try_from(m)?; + if let Some(creds) = creds_options { + storage_options_with_credentials(&mut storage_options, creds); + } - let access = SqliteAccess { - db: PathBuf::from(&location), - }; - access.validate_access().await?; - - DatabaseOptions::Sqlite(DatabaseOptionsSqlite { location }) + DatabaseOptions::Sqlite(DatabaseOptionsSqlite { + location: location.clone(), + storage_options: Some(storage_options), + }) } DatabaseOptions::DEBUG => { datasources::debug::validate_tunnel_connections(tunnel_options.as_ref())?; @@ -698,13 +698,20 @@ impl<'a> SessionPlanner<'a> { TableOptions::SQLITE => { let location: String = m.remove_required("location")?; let table: String = m.remove_required("table")?; + let mut storage_options = StorageOptions::try_from(m)?; + if let Some(creds) = creds_options { + storage_options_with_credentials(&mut storage_options, creds); + } - let access = SqliteAccess { - db: PathBuf::from(&location), - }; - access.validate_table_access(&table).await?; - - TableOptions::Sqlite(TableOptionsSqlite { location, table }) + TableOptions::Sqlite(TableOptionsObjectStore { + location, + storage_options, + name: table.into(), + file_type: None, + compression: None, + schema_sample_size: None, + columns: Vec::new(), + }) } TableOptions::LOCAL => { let location: String = m.remove_required("location")?; @@ -835,6 +842,7 @@ impl<'a> SessionPlanner<'a> { .insert(AzureConfigKey::AccessKey.as_ref().to_string(), access_key); TableOptions::Azure(TableOptionsObjectStore { + name: None, location: source_url, storage_options: opts, file_type: Some(file_type.to_string()), @@ -857,6 +865,7 @@ impl<'a> SessionPlanner<'a> { TableOptions::Delta(TableOptionsObjectStore { location, storage_options, + name: None, file_type: None, compression: None, columns: Vec::new(), @@ -870,6 +879,7 @@ impl<'a> SessionPlanner<'a> { TableOptions::Iceberg(TableOptionsObjectStore { location, storage_options, + name: None, file_type: None, compression: None, schema_sample_size: None, @@ -902,6 +912,7 @@ impl<'a> SessionPlanner<'a> { TableOptions::Lance(TableOptionsObjectStore { location, storage_options, + name: None, file_type: None, compression: None, schema_sample_size: None, @@ -925,6 +936,7 @@ impl<'a> SessionPlanner<'a> { TableOptions::Bson(TableOptionsObjectStore { location, storage_options, + name: None, file_type: None, compression: None, schema_sample_size, diff --git a/testdata/sqllogictests_object_store/gcs/sqlite.slt b/testdata/sqllogictests_object_store/gcs/sqlite.slt new file mode 100644 index 000000000..a07571551 --- /dev/null +++ b/testdata/sqllogictests_object_store/gcs/sqlite.slt @@ -0,0 +1,48 @@ +statement ok +CREATE CREDENTIALS gcp_creds PROVIDER gcp OPTIONS + ( service_account_key '${GCP_SERVICE_ACCOUNT_KEY}' ); + +query I +select count(*) from read_sqlite('gs://${GCS_BUCKET_NAME}/leaders.sqlite', gcp_creds, leaders); +---- +16386 + +query II +SELECT lead_count,location_count +FROM read_sqlite('gs://${GCS_BUCKET_NAME}/leaders.sqlite', gcp_creds, leaders) +WHERE name = 'Sam Kleinman'; +---- +175 57 + +statement ok +CREATE EXTERNAL TABLE leaders + FROM sqlite + OPTIONS ( + location = 'gs://${GCS_BUCKET_NAME}/leaders.sqlite', + table = 'leaders', + service_account_key ='${GCP_SERVICE_ACCOUNT_KEY}', + ); + +query I +select count(*) FROM leaders; +---- +16386 + +statement error +INSERT INTO leaders VALUES (21294,'B.F. White',1,0.0,0,1); + +statement ok +DROP TABLE leaders; + +statement ok +CREATE EXTERNAL DATABASE minutes + FROM sqlite + OPTIONS ( + location = 'gs://${GCS_BUCKET_NAME}/leaders.sqlite', + service_account_key ='${GCP_SERVICE_ACCOUNT_KEY}' + ); + +query I +SELECT id FROM minutes.public.leaders WHERE minutes.public.leaders.name = 'Sam Kleinman'; +---- +11517 \ No newline at end of file diff --git a/testdata/sqllogictests_sqlite/validation.slt b/testdata/sqllogictests_sqlite/validation.slt index a3addbbab..76b8da54d 100644 --- a/testdata/sqllogictests_sqlite/validation.slt +++ b/testdata/sqllogictests_sqlite/validation.slt @@ -1,31 +1,43 @@ # Validation tests for sqlite external database and external tables +# -# External database validation +# Originally these operations failed because GlareDB validated that +# the table existed when the table was created in the catalog, but +# these should error at query time: they may fail to exist now but may +# exist later. -# Validation test error with the wrong location -statement error unable to open database file +statement ok CREATE EXTERNAL DATABASE wrong_location FROM sqlite OPTIONS ( location = '/some/path/not/exists.db' ); -# External table validation -# Validation test error with the wrong location -statement error unable to open database file -CREATE EXTERNAL TABLE wrong_location +statement error +select * from wrong_location.public.datatypes; + +statement ok +CREATE EXTERNAL TABLE wrong_location2 FROM sqlite OPTIONS ( location = '/some/path/not/exists.db', table = 'datatypes' ); -# Validation test error with the wrong table -statement error no such table +statement error +select * from wrong_location2.datatypes; + +statement error +select * from wrong_location2.public.datatypes; + +statement ok CREATE EXTERNAL TABLE wrong_table FROM sqlite OPTIONS ( location = '${SQLITE_DB_LOCATION}', table = 'invalid_table' ); + +statement error +select * from wrong_table;