diff --git a/crates/datasources/src/cassandra/errors.rs b/crates/datasources/src/cassandra/errors.rs index 9d65b07be..5c9123291 100644 --- a/crates/datasources/src/cassandra/errors.rs +++ b/crates/datasources/src/cassandra/errors.rs @@ -6,6 +6,8 @@ pub enum CassandraError { QueryError(#[from] scylla::transport::errors::QueryError), #[error("Unsupported DataType: {0}")] UnsupportedDataType(String), + #[error("Table not found: {0}")] + TableNotFound(String), } pub type Result = std::result::Result; diff --git a/crates/datasources/src/cassandra/mod.rs b/crates/datasources/src/cassandra/mod.rs index 2c150e944..caa604e46 100644 --- a/crates/datasources/src/cassandra/mod.rs +++ b/crates/datasources/src/cassandra/mod.rs @@ -37,10 +37,11 @@ use std::fmt; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; use self::exec::CassandraExec; -struct CassandraAccess { +pub struct CassandraAccess { session: Session, } @@ -67,8 +68,12 @@ fn try_convert_dtype(ty: &ColumnType) -> Result { } impl CassandraAccess { - pub async fn try_new(conn_str: impl AsRef) -> Result { - let session = SessionBuilder::new().known_node(conn_str).build().await?; + pub async fn try_new(host: impl AsRef) -> Result { + let session = SessionBuilder::new() + .known_node(host) + .connection_timeout(Duration::from_secs(10)) + .build() + .await?; Ok(Self { session }) } async fn get_schema(&self, ks: &str, table: &str) -> Result { @@ -86,6 +91,17 @@ impl CassandraAccess { .collect::>()?; Ok(ArrowSchema::new(fields)) } + pub async fn validate_table_access(&self, ks: &str, table: &str) -> Result<()> { + let query = format!("SELECT * FROM {ks}.{table} LIMIT 1"); + let res = self.session.query(query, &[]).await?; + if res.col_specs.is_empty() { + return Err(CassandraError::TableNotFound(format!( + "table {} not found in keyspace {}", + table, ks + ))); + } + Ok(()) + } } #[derive(Debug, Clone)] @@ -97,8 +113,8 @@ pub struct CassandraTableProvider { } impl CassandraTableProvider { - pub async fn try_new(conn_str: String, ks: String, table: String) -> Result { - let access = CassandraAccess::try_new(conn_str).await?; + pub async fn try_new(host: String, ks: String, table: String) -> Result { + let access = CassandraAccess::try_new(host).await?; let schema = access.get_schema(&ks, &table).await?; Ok(Self { schema: Arc::new(schema), @@ -135,7 +151,7 @@ impl TableProvider for CassandraTableProvider { _ctx: &SessionState, projection: Option<&Vec>, _filters: &[Expr], - _limit: Option, + limit: Option, ) -> DatafusionResult> { let projected_schema = match projection { Some(projection) => Arc::new(self.schema.project(projection)?), @@ -150,10 +166,13 @@ impl TableProvider for CassandraTableProvider { .map(|f| f.name().clone()) .collect::>() .join(","); - let query = format!( + let mut query = format!( "SELECT {} FROM {}.{}", projection_string, self.ks, self.table ); + if let Some(limit) = limit { + query.push_str(&format!(" LIMIT {}", limit)); + } let exec = CassandraExec::new(projected_schema, query, self.session.clone()); Ok(Arc::new(exec)) diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index 41bd5d480..92d05a024 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -403,7 +403,8 @@ pub fn init_session_registry<'a>( | TableOptions::MongoDb(_) | TableOptions::Snowflake(_) | TableOptions::SqlServer(_) - | TableOptions::Clickhouse(_) => continue, + | TableOptions::Clickhouse(_) + | TableOptions::Cassandra(_) => continue, }; let base_url = access.base_url()?; diff --git a/crates/protogen/proto/metastore/options.proto b/crates/protogen/proto/metastore/options.proto index 7eca311e8..02f0cdb78 100644 --- a/crates/protogen/proto/metastore/options.proto +++ b/crates/protogen/proto/metastore/options.proto @@ -128,6 +128,7 @@ message TableOptions { TableOptionsObjectStore lance = 15; TableOptionsObjectStore bson = 16; TableOptionsClickhouse clickhouse = 17; + TableOptionsCassandra cassandra = 18; } // next: 18 } @@ -230,6 +231,12 @@ message TableOptionsClickhouse { optional string database = 3; } +message TableOptionsCassandra { + string host = 1; + string keyspace = 2; + string table = 3; +} + // Tunnel options message TunnelOptions { diff --git a/crates/protogen/src/metastore/types/options.rs b/crates/protogen/src/metastore/types/options.rs index b001fa12d..7cc781c53 100644 --- a/crates/protogen/src/metastore/types/options.rs +++ b/crates/protogen/src/metastore/types/options.rs @@ -549,6 +549,7 @@ pub enum TableOptions { Lance(TableOptionsObjectStore), Bson(TableOptionsObjectStore), Clickhouse(TableOptionsClickhouse), + Cassandra(TableOptionsCassandra), } impl TableOptions { @@ -569,6 +570,7 @@ impl TableOptions { pub const LANCE: &'static str = "lance"; pub const BSON: &'static str = "bson"; pub const CLICKHOUSE: &'static str = "clickhouse"; + pub const CASSANDRA: &'static str = "cassandra"; pub const fn new_internal(columns: Vec) -> TableOptions { TableOptions::Internal(TableOptionsInternal { columns }) @@ -593,6 +595,7 @@ impl TableOptions { TableOptions::Lance(_) => Self::LANCE, TableOptions::Bson(_) => Self::BSON, TableOptions::Clickhouse(_) => Self::CLICKHOUSE, + TableOptions::Cassandra(_) => Self::CASSANDRA, } } } @@ -626,6 +629,7 @@ impl TryFrom for TableOptions { options::table_options::Options::Clickhouse(v) => { TableOptions::Clickhouse(v.try_into()?) } + options::table_options::Options::Cassandra(v) => TableOptions::Cassandra(v.try_into()?), }) } } @@ -658,6 +662,7 @@ impl TryFrom for options::table_options::Options { TableOptions::Lance(v) => options::table_options::Options::Lance(v.into()), TableOptions::Bson(v) => options::table_options::Options::Bson(v.into()), TableOptions::Clickhouse(v) => options::table_options::Options::Clickhouse(v.into()), + TableOptions::Cassandra(v) => options::table_options::Options::Cassandra(v.into()), }) } } @@ -1028,6 +1033,34 @@ impl From for options::TableOptionsClickhouse { } } +#[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)] +pub struct TableOptionsCassandra { + pub host: String, + pub keyspace: String, + pub table: String, +} + +impl TryFrom for TableOptionsCassandra { + type Error = ProtoConvError; + fn try_from(value: options::TableOptionsCassandra) -> Result { + Ok(TableOptionsCassandra { + host: value.host, + keyspace: value.keyspace, + table: value.table, + }) + } +} + +impl From for options::TableOptionsCassandra { + fn from(value: TableOptionsCassandra) -> Self { + options::TableOptionsCassandra { + host: value.host, + keyspace: value.keyspace, + table: value.table, + } + } +} + #[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)] pub struct TableOptionsSnowflake { pub account_name: String, diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index b66e93a73..f9724d086 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -13,6 +13,7 @@ use datafusion::prelude::SessionContext; use datafusion_ext::functions::{DefaultTableContextProvider, FuncParamValue}; use datasources::bigquery::{BigQueryAccessor, BigQueryTableAccess}; use datasources::bson::table::bson_streaming_table; +use datasources::cassandra::CassandraTableProvider; use datasources::clickhouse::{ClickhouseAccess, ClickhouseTableProvider, OwnedClickhouseTableRef}; use datasources::common::url::DatasourceUrl; use datasources::debug::DebugTableType; @@ -36,10 +37,10 @@ use protogen::metastore::types::options::{ DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, DatabaseOptionsDebug, DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql, DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, TableOptions, - TableOptionsBigQuery, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, - TableOptionsInternal, TableOptionsLocal, TableOptionsMongoDb, TableOptionsMysql, - TableOptionsObjectStore, TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, - TableOptionsSqlServer, TunnelOptions, + TableOptionsBigQuery, TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug, + TableOptionsGcs, TableOptionsInternal, TableOptionsLocal, TableOptionsMongoDb, + TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres, TableOptionsS3, + TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions, }; use sqlbuiltins::builtins::DEFAULT_CATALOG; use sqlbuiltins::functions::FUNCTION_REGISTRY; @@ -501,6 +502,17 @@ impl<'a> ExternalDispatcher<'a> { .await?, ) } + TableOptions::Cassandra(TableOptionsCassandra { + host, + keyspace, + table, + }) => { + let table = + CassandraTableProvider::try_new(host.clone(), keyspace.clone(), table.clone()) + .await?; + + Ok(Arc::new(table)) + } } } diff --git a/crates/sqlexec/src/dispatch/mod.rs b/crates/sqlexec/src/dispatch/mod.rs index 5529b99be..baf39fd22 100644 --- a/crates/sqlexec/src/dispatch/mod.rs +++ b/crates/sqlexec/src/dispatch/mod.rs @@ -97,6 +97,8 @@ pub enum DispatchError { SshKey(#[from] datasources::common::ssh::key::SshKeyError), #[error(transparent)] ExtensionError(#[from] datafusion_ext::errors::ExtensionError), + #[error(transparent)] + CassandraDatasource(#[from] datasources::cassandra::CassandraError), #[error("{0}")] String(String), diff --git a/crates/sqlexec/src/planner/errors.rs b/crates/sqlexec/src/planner/errors.rs index 04f9eeb7b..0e4492bbd 100644 --- a/crates/sqlexec/src/planner/errors.rs +++ b/crates/sqlexec/src/planner/errors.rs @@ -129,6 +129,7 @@ impl_from_dispatch_variant!(datasources::lake::iceberg::errors::IcebergError); impl_from_dispatch_variant!(datasources::object_store::errors::ObjectStoreSourceError); impl_from_dispatch_variant!(datasources::sqlserver::errors::SqlServerError); impl_from_dispatch_variant!(datasources::clickhouse::errors::ClickhouseError); +impl_from_dispatch_variant!(datasources::cassandra::CassandraError); #[allow(unused_macros)] macro_rules! internal { diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index 015541720..53029bfba 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -14,6 +14,7 @@ use datafusion::sql::TableReference; use datafusion_ext::planner::SqlQueryPlanner; use datafusion_ext::AsyncContextProvider; use datasources::bigquery::{BigQueryAccessor, BigQueryTableAccess}; +use datasources::cassandra::CassandraAccess; use datasources::clickhouse::{ClickhouseAccess, ClickhouseTableRef}; use datasources::common::ssh::{key::SshKey, SshConnection, SshConnectionParameters}; use datasources::common::url::{DatasourceUrl, DatasourceUrlType}; @@ -46,10 +47,10 @@ use protogen::metastore::types::options::{ DatabaseOptionsDebug, DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql, DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog, DeltaLakeUnityCatalog, StorageOptions, TableOptions, TableOptionsBigQuery, - TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsLocal, - TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres, - TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions, - TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh, + TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, + TableOptionsLocal, TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore, + TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, + TunnelOptions, TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh, }; use protogen::metastore::types::service::{AlterDatabaseOperation, AlterTableOperation}; use sqlbuiltins::builtins::{CURRENT_SESSION_SCHEMA, DEFAULT_CATALOG}; @@ -533,6 +534,19 @@ impl<'a> SessionPlanner<'a> { database: database_name, }) } + TableOptions::CASSANDRA => { + let host: String = m.remove_required("host")?; + let keyspace: String = m.remove_required("keyspace")?; + let table: String = m.remove_required("table")?; + let access = CassandraAccess::try_new(host.clone()).await?; + access.validate_table_access(&keyspace, &table).await?; + + TableOptions::Cassandra(TableOptionsCassandra { + host, + keyspace, + table, + }) + } TableOptions::LOCAL => { let location: String = m.remove_required("location")?; diff --git a/testdata/sqllogictests_cassandra/basic.slt b/testdata/sqllogictests_cassandra/basic.slt new file mode 100644 index 000000000..a19a5c857 --- /dev/null +++ b/testdata/sqllogictests_cassandra/basic.slt @@ -0,0 +1,12 @@ +# Basic tests for external tables. + +statement ok +CREATE EXTERNAL TABLE basic + FROM cassandra + OPTIONS ( + host = '${CASSANDRA_CONN_STRING}', + keyspace = 'test', + table = 'bikeshare_stations' + ); + +include ${PWD}/testdata/sqllogictests_datasources_common/include/basic.slti diff --git a/testdata/sqllogictests_cassandra/external_table.slt b/testdata/sqllogictests_cassandra/external_table.slt new file mode 100644 index 000000000..42cc7697a --- /dev/null +++ b/testdata/sqllogictests_cassandra/external_table.slt @@ -0,0 +1,19 @@ +# Basic tests for external tables. + +statement ok +CREATE EXTERNAL TABLE external_table + FROM cassandra + OPTIONS ( + host = '127.0.0.1:9042', + keyspace = 'test', + table = 'bikeshare_stations', + ); + +query I +SELECT count(*) FROM external_table; +---- +102 + +statement ok +DROP TABLE external_table; + diff --git a/testdata/sqllogictests_cassandra/validation.slt b/testdata/sqllogictests_cassandra/validation.slt new file mode 100644 index 000000000..ee06f38bf --- /dev/null +++ b/testdata/sqllogictests_cassandra/validation.slt @@ -0,0 +1,21 @@ +# Validation tests for clickhouse external database and external tables + +# External database validation + +statement error +CREATE EXTERNAL DATABASE wrong_host + FROM cassandra + OPTIONS ( + host = '127.0.0.1:9876', + ); + +# Validation test error with the wrong table name +statement error +CREATE EXTERNAL TABLE missing_table + FROM cassandra + OPTIONS ( + host = '${CASSANDRA_CONN_STRING}', + keyspace = 'test', + table = 'missing_table' + ); +