Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: external table for cassandra #2361

Merged
merged 5 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/datasources/src/cassandra/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub enum CassandraError {
QueryError(#[from] scylla::transport::errors::QueryError),
#[error("Unsupported DataType: {0}")]
UnsupportedDataType(String),
#[error("Table not found: {0}")]
TableNotFound(String),
}

pub type Result<T, E = CassandraError> = std::result::Result<T, E>;
27 changes: 23 additions & 4 deletions crates/datasources/src/cassandra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use self::exec::CassandraExec;

struct CassandraAccess {
pub struct CassandraAccess {
session: Session,
}

Expand Down Expand Up @@ -68,7 +69,11 @@ fn try_convert_dtype(ty: &ColumnType) -> Result<DataType> {

impl CassandraAccess {
pub async fn try_new(conn_str: impl AsRef<str>) -> Result<Self> {
let session = SessionBuilder::new().known_node(conn_str).build().await?;
let session = SessionBuilder::new()
.known_node(conn_str)
.connection_timeout(Duration::from_secs(10))
.build()
.await?;
Ok(Self { session })
}
async fn get_schema(&self, ks: &str, table: &str) -> Result<ArrowSchema> {
Expand All @@ -86,6 +91,17 @@ impl CassandraAccess {
.collect::<Result<_>>()?;
Ok(ArrowSchema::new(fields))
}
pub async fn validate_table_access(&self, ks: &str, table: &str) -> Result<()> {
let query = format!("SELECT * FROM {ks}.{table} LIMIT 1");
let res = self.session.query(query, &[]).await?;
if res.col_specs.is_empty() {
return Err(CassandraError::TableNotFound(format!(
"table {} not found in keyspace {}",
table, ks
)));
}
Ok(())
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -135,7 +151,7 @@ impl TableProvider for CassandraTableProvider {
_ctx: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
limit: Option<usize>,
) -> DatafusionResult<Arc<dyn ExecutionPlan>> {
let projected_schema = match projection {
Some(projection) => Arc::new(self.schema.project(projection)?),
Expand All @@ -150,10 +166,13 @@ impl TableProvider for CassandraTableProvider {
.map(|f| f.name().clone())
.collect::<Vec<_>>()
.join(",");
let query = format!(
let mut query = format!(
"SELECT {} FROM {}.{}",
projection_string, self.ks, self.table
);
if let Some(limit) = limit {
query.push_str(&format!(" LIMIT {}", limit));
}

let exec = CassandraExec::new(projected_schema, query, self.session.clone());
Ok(Arc::new(exec))
Expand Down
3 changes: 2 additions & 1 deletion crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ pub fn init_session_registry<'a>(
| TableOptions::MongoDb(_)
| TableOptions::Snowflake(_)
| TableOptions::SqlServer(_)
| TableOptions::Clickhouse(_) => continue,
| TableOptions::Clickhouse(_)
| TableOptions::Cassandra(_) => continue,
};

let base_url = access.base_url()?;
Expand Down
7 changes: 7 additions & 0 deletions crates/protogen/proto/metastore/options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ message TableOptions {
TableOptionsObjectStore lance = 15;
TableOptionsObjectStore bson = 16;
TableOptionsClickhouse clickhouse = 17;
TableOptionsCassandra cassandra = 18;
}
// next: 18
}
Expand Down Expand Up @@ -229,6 +230,12 @@ message TableOptionsClickhouse {
string table = 2;
}

message TableOptionsCassandra {
string connection_string = 1;
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
string keyspace = 2;
string table = 3;
}

// Tunnel options

message TunnelOptions {
Expand Down
33 changes: 33 additions & 0 deletions crates/protogen/src/metastore/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ pub enum TableOptions {
Lance(TableOptionsObjectStore),
Bson(TableOptionsObjectStore),
Clickhouse(TableOptionsClickhouse),
Cassandra(TableOptionsCassandra),
}

impl TableOptions {
Expand All @@ -569,6 +570,7 @@ impl TableOptions {
pub const LANCE: &'static str = "lance";
pub const BSON: &'static str = "bson";
pub const CLICKHOUSE: &'static str = "clickhouse";
pub const CASSANDRA: &'static str = "cassandra";

pub const fn new_internal(columns: Vec<InternalColumnDefinition>) -> TableOptions {
TableOptions::Internal(TableOptionsInternal { columns })
Expand All @@ -593,6 +595,7 @@ impl TableOptions {
TableOptions::Lance(_) => Self::LANCE,
TableOptions::Bson(_) => Self::BSON,
TableOptions::Clickhouse(_) => Self::CLICKHOUSE,
TableOptions::Cassandra(_) => Self::CASSANDRA,
}
}
}
Expand Down Expand Up @@ -626,6 +629,7 @@ impl TryFrom<options::table_options::Options> for TableOptions {
options::table_options::Options::Clickhouse(v) => {
TableOptions::Clickhouse(v.try_into()?)
}
options::table_options::Options::Cassandra(v) => TableOptions::Cassandra(v.try_into()?),
})
}
}
Expand Down Expand Up @@ -658,6 +662,7 @@ impl TryFrom<TableOptions> for options::table_options::Options {
TableOptions::Lance(v) => options::table_options::Options::Lance(v.into()),
TableOptions::Bson(v) => options::table_options::Options::Bson(v.into()),
TableOptions::Clickhouse(v) => options::table_options::Options::Clickhouse(v.into()),
TableOptions::Cassandra(v) => options::table_options::Options::Cassandra(v.into()),
})
}
}
Expand Down Expand Up @@ -1025,6 +1030,34 @@ impl From<TableOptionsClickhouse> for options::TableOptionsClickhouse {
}
}

#[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)]
pub struct TableOptionsCassandra {
pub connection_string: String,
pub keyspace: String,
pub table: String,
}

impl TryFrom<options::TableOptionsCassandra> for TableOptionsCassandra {
type Error = ProtoConvError;
fn try_from(value: options::TableOptionsCassandra) -> Result<Self, Self::Error> {
Ok(TableOptionsCassandra {
connection_string: value.connection_string,
keyspace: value.keyspace,
table: value.table,
})
}
}

impl From<TableOptionsCassandra> for options::TableOptionsCassandra {
fn from(value: TableOptionsCassandra) -> Self {
options::TableOptionsCassandra {
connection_string: value.connection_string,
keyspace: value.keyspace,
table: value.table,
}
}
}

#[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)]
pub struct TableOptionsSnowflake {
pub account_name: String,
Expand Down
23 changes: 19 additions & 4 deletions crates/sqlexec/src/dispatch/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use datafusion::prelude::SessionContext;
use datafusion_ext::functions::{DefaultTableContextProvider, FuncParamValue};
use datasources::bigquery::{BigQueryAccessor, BigQueryTableAccess};
use datasources::bson::table::bson_streaming_table;
use datasources::cassandra::CassandraTableProvider;
use datasources::clickhouse::{ClickhouseAccess, ClickhouseTableProvider};
use datasources::common::url::DatasourceUrl;
use datasources::debug::DebugTableType;
Expand All @@ -36,10 +37,10 @@ use protogen::metastore::types::options::{
DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, DatabaseOptionsDebug,
DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql,
DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, TableOptions,
TableOptionsBigQuery, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs,
TableOptionsInternal, TableOptionsLocal, TableOptionsMongoDb, TableOptionsMysql,
TableOptionsObjectStore, TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake,
TableOptionsSqlServer, TunnelOptions,
TableOptionsBigQuery, TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug,
TableOptionsGcs, TableOptionsInternal, TableOptionsLocal, TableOptionsMongoDb,
TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres, TableOptionsS3,
TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions,
};
use sqlbuiltins::builtins::DEFAULT_CATALOG;
use sqlbuiltins::functions::FUNCTION_REGISTRY;
Expand Down Expand Up @@ -497,6 +498,20 @@ impl<'a> ExternalDispatcher<'a> {
.await?,
)
}
TableOptions::Cassandra(TableOptionsCassandra {
connection_string,
keyspace,
table,
}) => {
let table = CassandraTableProvider::try_new(
connection_string.clone(),
keyspace.clone(),
table.clone(),
)
.await?;

Ok(Arc::new(table))
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/sqlexec/src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub enum DispatchError {
SshKey(#[from] datasources::common::ssh::key::SshKeyError),
#[error(transparent)]
ExtensionError(#[from] datafusion_ext::errors::ExtensionError),
#[error(transparent)]
CassandraDatasource(#[from] datasources::cassandra::CassandraError),
}

impl DispatchError {
Expand Down
1 change: 1 addition & 0 deletions crates/sqlexec/src/planner/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl_from_dispatch_variant!(datasources::lake::iceberg::errors::IcebergError);
impl_from_dispatch_variant!(datasources::object_store::errors::ObjectStoreSourceError);
impl_from_dispatch_variant!(datasources::sqlserver::errors::SqlServerError);
impl_from_dispatch_variant!(datasources::clickhouse::errors::ClickhouseError);
impl_from_dispatch_variant!(datasources::cassandra::CassandraError);

#[allow(unused_macros)]
macro_rules! internal {
Expand Down
22 changes: 18 additions & 4 deletions crates/sqlexec/src/planner/session_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use datafusion::sql::TableReference;
use datafusion_ext::planner::SqlQueryPlanner;
use datafusion_ext::AsyncContextProvider;
use datasources::bigquery::{BigQueryAccessor, BigQueryTableAccess};
use datasources::cassandra::CassandraAccess;
use datasources::clickhouse::ClickhouseAccess;
use datasources::common::ssh::{key::SshKey, SshConnection, SshConnectionParameters};
use datasources::common::url::{DatasourceUrl, DatasourceUrlType};
Expand Down Expand Up @@ -46,10 +47,10 @@ use protogen::metastore::types::options::{
DatabaseOptionsDebug, DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql,
DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog,
DeltaLakeUnityCatalog, StorageOptions, TableOptions, TableOptionsBigQuery,
TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsLocal,
TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres,
TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions,
TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh,
TableOptionsCassandra, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs,
TableOptionsLocal, TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore,
TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer,
TunnelOptions, TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh,
};
use protogen::metastore::types::service::{AlterDatabaseOperation, AlterTableOperation};
use sqlbuiltins::builtins::{CURRENT_SESSION_SCHEMA, DEFAULT_CATALOG};
Expand Down Expand Up @@ -524,6 +525,19 @@ impl<'a> SessionPlanner<'a> {
table: table_name,
})
}
TableOptions::CASSANDRA => {
let connection_string: String = m.remove_required("connection_string")?;
let keyspace: String = m.remove_required("keyspace")?;
let table: String = m.remove_required("table")?;
let access = CassandraAccess::try_new(connection_string.clone()).await?;
access.validate_table_access(&keyspace, &table).await?;

TableOptions::Cassandra(TableOptionsCassandra {
connection_string,
keyspace,
table,
})
}
TableOptions::LOCAL => {
let location: String = m.remove_required("location")?;

Expand Down
12 changes: 12 additions & 0 deletions testdata/sqllogictests_cassandra/basic.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Basic tests for external tables.

statement ok
CREATE EXTERNAL TABLE basic
FROM cassandra
OPTIONS (
connection_string = '${CASSANDRA_CONN_STRING}',
keyspace = 'test',
table = 'bikeshare_stations'
);

include ${PWD}/testdata/sqllogictests_datasources_common/include/basic.slti
19 changes: 19 additions & 0 deletions testdata/sqllogictests_cassandra/external_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Basic tests for external tables.

statement ok
CREATE EXTERNAL TABLE external_table
FROM cassandra
OPTIONS (
connection_string = '127.0.0.1:9042',
keyspace = 'test',
table = 'bikeshare_stations',
);

query I
SELECT count(*) FROM external_table;
----
102

statement ok
DROP TABLE external_table;

21 changes: 21 additions & 0 deletions testdata/sqllogictests_cassandra/validation.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Validation tests for clickhouse external database and external tables

# External database validation

statement error
CREATE EXTERNAL DATABASE wrong_connection_string
FROM cassandra
OPTIONS (
connection_string = '127.0.0.1:9876',
);

# Validation test error with the wrong table name
statement error
CREATE EXTERNAL TABLE missing_table
FROM cassandra
OPTIONS (
connection_string = '${CASSANDRA_CONN_STRING}',
keyspace = 'test',
table = 'missing_table'
);

Loading