diff --git a/crates/datasources/src/mongodb/errors.rs b/crates/datasources/src/mongodb/errors.rs index aa449e771..541a4dd2e 100644 --- a/crates/datasources/src/mongodb/errors.rs +++ b/crates/datasources/src/mongodb/errors.rs @@ -1,5 +1,5 @@ #[derive(Debug, thiserror::Error)] -pub enum MongoError { +pub enum MongoDbError { #[error("Failed to merge schemas: {0}")] FailedSchemaMerge(datafusion::arrow::error::ArrowError), @@ -19,7 +19,7 @@ pub enum MongoError { Bson(#[from] crate::bson::errors::BsonError), #[error(transparent)] - RawBSON(#[from] mongodb::bson::raw::Error), + RawBson(#[from] mongodb::bson::raw::Error), } -pub type Result = std::result::Result; +pub type Result = std::result::Result; diff --git a/crates/datasources/src/mongodb/exec.rs b/crates/datasources/src/mongodb/exec.rs index 570e90101..c5e632496 100644 --- a/crates/datasources/src/mongodb/exec.rs +++ b/crates/datasources/src/mongodb/exec.rs @@ -22,24 +22,24 @@ use mongodb::Cursor; use datafusion_ext::metrics::DataSourceMetricsStreamAdapter; -use super::errors::{MongoError, Result}; +use super::errors::{MongoDbError, Result}; use crate::bson::builder::RecordStructBuilder; #[derive(Debug)] -pub struct MongoBsonExec { +pub struct MongoDbBsonExec { cursor: Mutex>>, schema: Arc, limit: Option, metrics: ExecutionPlanMetricsSet, } -impl MongoBsonExec { +impl MongoDbBsonExec { pub fn new( cursor: Mutex>>, schema: Arc, limit: Option, - ) -> MongoBsonExec { - MongoBsonExec { + ) -> MongoDbBsonExec { + MongoDbBsonExec { cursor, schema, limit, @@ -48,7 +48,7 @@ impl MongoBsonExec { } } -impl ExecutionPlan for MongoBsonExec { +impl ExecutionPlan for MongoDbBsonExec { fn as_any(&self) -> &dyn Any { self } @@ -74,7 +74,7 @@ impl ExecutionPlan for MongoBsonExec { _children: Vec>, ) -> DatafusionResult> { Err(DataFusionError::Execution( - "cannot replace children for MongoDBExec".to_string(), + "cannot replace children for MongoDB Exec".to_string(), )) } @@ -114,7 +114,7 @@ impl ExecutionPlan for MongoBsonExec { } } -impl DisplayAs for MongoBsonExec { +impl DisplayAs for MongoDbBsonExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "MongoBsonExec") } @@ -174,7 +174,7 @@ impl RecordBatchStream for BsonStream { } } -fn document_chunk_to_record_batch>( +fn document_chunk_to_record_batch>( chunk: Vec>, fields: Fields, ) -> Result { diff --git a/crates/datasources/src/mongodb/mod.rs b/crates/datasources/src/mongodb/mod.rs index 3920f35a7..a8026d480 100644 --- a/crates/datasources/src/mongodb/mod.rs +++ b/crates/datasources/src/mongodb/mod.rs @@ -6,8 +6,8 @@ mod infer; use datafusion_ext::errors::ExtensionError; use datafusion_ext::functions::VirtualLister; -use errors::{MongoError, Result}; -use exec::MongoBsonExec; +use errors::{MongoDbError, Result}; +use exec::MongoDbBsonExec; use infer::TableSampler; use async_trait::async_trait; @@ -35,36 +35,36 @@ use tracing::debug; const ID_FIELD_NAME: &str = "_id"; #[derive(Debug)] -pub enum MongoProtocol { +pub enum MongoDbProtocol { MongoDb, MongoDbSrv, } -impl Default for MongoProtocol { +impl Default for MongoDbProtocol { fn default() -> Self { Self::MongoDb } } -impl MongoProtocol { +impl MongoDbProtocol { const MONGODB: &'static str = "mongodb"; const MONGODB_SRV: &'static str = "mongodb+srv"; } -impl FromStr for MongoProtocol { - type Err = MongoError; +impl FromStr for MongoDbProtocol { + type Err = MongoDbError; fn from_str(s: &str) -> std::result::Result { let proto = match s { Self::MONGODB => Self::MongoDb, Self::MONGODB_SRV => Self::MongoDbSrv, - s => return Err(MongoError::InvalidProtocol(s.to_owned())), + s => return Err(MongoDbError::InvalidProtocol(s.to_owned())), }; Ok(proto) } } -impl Display for MongoProtocol { +impl Display for MongoDbProtocol { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let s = match self { Self::MongoDb => Self::MONGODB, @@ -78,7 +78,7 @@ impl Display for MongoProtocol { pub enum MongoDbConnection { ConnectionString(String), Parameters { - protocol: MongoProtocol, + protocol: MongoDbProtocol, host: String, port: Option, user: String, @@ -107,7 +107,7 @@ impl MongoDbConnection { } // Address write!(&mut conn_str, "@{host}").unwrap(); - if matches!(protocol, MongoProtocol::MongoDb) { + if matches!(protocol, MongoDbProtocol::MongoDb) { // Only attempt to write port if the protocol is "mongodb" if let Some(port) = port { write!(&mut conn_str, ":{port}").unwrap(); @@ -120,17 +120,17 @@ impl MongoDbConnection { } #[derive(Debug, Clone)] -pub struct MongoAccessor { +pub struct MongoDbAccessor { client: Client, } -impl MongoAccessor { - pub async fn connect(connection_string: &str) -> Result { +impl MongoDbAccessor { + pub async fn connect(connection_string: &str) -> Result { let mut opts = ClientOptions::parse(connection_string).await?; opts.app_name = Some("GlareDB (MongoDB Data source)".to_string()); let client = Client::with_options(opts)?; - Ok(MongoAccessor { client }) + Ok(MongoDbAccessor { client }) } pub async fn validate_external_database(connection_string: &str) -> Result<()> { @@ -144,8 +144,8 @@ impl MongoAccessor { Ok(()) } - pub fn into_table_accessor(self, info: MongoTableAccessInfo) -> MongoTableAccessor { - MongoTableAccessor { + pub fn into_table_accessor(self, info: MongoDbTableAccessInfo) -> MongoDbTableAccessor { + MongoDbTableAccessor { info, client: self.client, } @@ -153,7 +153,7 @@ impl MongoAccessor { } #[async_trait] -impl VirtualLister for MongoAccessor { +impl VirtualLister for MongoDbAccessor { async fn list_schemas(&self) -> Result, ExtensionError> { use ExtensionError::ListingErrBoxed; @@ -198,18 +198,18 @@ impl VirtualLister for MongoAccessor { } #[derive(Debug, Clone)] -pub struct MongoTableAccessInfo { +pub struct MongoDbTableAccessInfo { pub database: String, // "Schema" pub collection: String, } #[derive(Debug, Clone)] -pub struct MongoTableAccessor { - info: MongoTableAccessInfo, +pub struct MongoDbTableAccessor { + info: MongoDbTableAccessInfo, client: Client, } -impl MongoTableAccessor { +impl MongoDbTableAccessor { /// Validate that we can access the table. pub async fn validate(&self) -> Result<()> { let _ = self @@ -222,7 +222,7 @@ impl MongoTableAccessor { Ok(()) } - pub async fn into_table_provider(self) -> Result { + pub async fn into_table_provider(self) -> Result { let collection = self .client .database(&self.info.database) @@ -231,7 +231,7 @@ impl MongoTableAccessor { let schema = sampler.infer_schema_from_sample().await?; - Ok(MongoTableProvider { + Ok(MongoDbTableProvider { schema: Arc::new(schema), collection: self .client @@ -241,13 +241,13 @@ impl MongoTableAccessor { } } -pub struct MongoTableProvider { +pub struct MongoDbTableProvider { schema: Arc, collection: Collection, } #[async_trait] -impl TableProvider for MongoTableProvider { +impl TableProvider for MongoDbTableProvider { fn as_any(&self) -> &dyn Any { self } @@ -317,7 +317,7 @@ impl TableProvider for MongoTableProvider { .await .map_err(|e| DataFusionError::External(Box::new(e)))?, )); - Ok(Arc::new(MongoBsonExec::new(cursor, schema, limit))) + Ok(Arc::new(MongoDbBsonExec::new(cursor, schema, limit))) } } @@ -427,7 +427,7 @@ mod tests { assert_eq!(&conn_str, "mongodb://prod:password123@127.0.0.1:5432"); let conn_str = MongoDbConnection::Parameters { - protocol: MongoProtocol::MongoDb, + protocol: MongoDbProtocol::MongoDb, host: "127.0.0.1".to_string(), port: Some(5432), user: "prod".to_string(), @@ -437,7 +437,7 @@ mod tests { assert_eq!(&conn_str, "mongodb://prod:password123@127.0.0.1:5432"); let conn_str = MongoDbConnection::Parameters { - protocol: MongoProtocol::MongoDbSrv, + protocol: MongoDbProtocol::MongoDbSrv, host: "127.0.0.1".to_string(), port: Some(5432), user: "prod".to_string(), diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index e8b0872fd..41bd5d480 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -400,7 +400,7 @@ pub fn init_session_registry<'a>( | TableOptions::Postgres(_) | TableOptions::BigQuery(_) | TableOptions::Mysql(_) - | TableOptions::Mongo(_) + | TableOptions::MongoDb(_) | TableOptions::Snowflake(_) | TableOptions::SqlServer(_) | TableOptions::Clickhouse(_) => continue, diff --git a/crates/protogen/proto/metastore/options.proto b/crates/protogen/proto/metastore/options.proto index 927209594..3cb0cb9df 100644 --- a/crates/protogen/proto/metastore/options.proto +++ b/crates/protogen/proto/metastore/options.proto @@ -42,7 +42,7 @@ message DatabaseOptions { DatabaseOptionsPostgres postgres = 3; DatabaseOptionsBigQuery bigquery = 4; DatabaseOptionsMysql mysql = 5; - DatabaseOptionsMongo mongo = 6; + DatabaseOptionsMongoDb mongodb = 6; DatabaseOptionsSnowflake snowflake = 7; DatabaseOptionsDeltaLake delta = 8; DatabaseOptionsSqlServer sql_server = 9; @@ -68,7 +68,7 @@ message DatabaseOptionsMysql { string connection_string = 1; } -message DatabaseOptionsMongo { +message DatabaseOptionsMongoDb { string connection_string = 1; } diff --git a/crates/protogen/src/metastore/types/options.rs b/crates/protogen/src/metastore/types/options.rs index 1b6509839..a82830c37 100644 --- a/crates/protogen/src/metastore/types/options.rs +++ b/crates/protogen/src/metastore/types/options.rs @@ -88,7 +88,7 @@ pub enum DatabaseOptions { Postgres(DatabaseOptionsPostgres), BigQuery(DatabaseOptionsBigQuery), Mysql(DatabaseOptionsMysql), - Mongo(DatabaseOptionsMongo), + MongoDb(DatabaseOptionsMongoDb), Snowflake(DatabaseOptionsSnowflake), Delta(DatabaseOptionsDeltaLake), SqlServer(DatabaseOptionsSqlServer), @@ -101,7 +101,7 @@ impl DatabaseOptions { pub const POSTGRES: &'static str = "postgres"; pub const BIGQUERY: &'static str = "bigquery"; pub const MYSQL: &'static str = "mysql"; - pub const MONGO: &'static str = "mongo"; + pub const MONGODB: &'static str = "mongo"; pub const SNOWFLAKE: &'static str = "snowflake"; pub const DELTA: &'static str = "delta"; pub const SQL_SERVER: &'static str = "sql_server"; @@ -114,7 +114,7 @@ impl DatabaseOptions { DatabaseOptions::Postgres(_) => Self::POSTGRES, DatabaseOptions::BigQuery(_) => Self::BIGQUERY, DatabaseOptions::Mysql(_) => Self::MYSQL, - DatabaseOptions::Mongo(_) => Self::MONGO, + DatabaseOptions::MongoDb(_) => Self::MONGODB, DatabaseOptions::Snowflake(_) => Self::SNOWFLAKE, DatabaseOptions::Delta(_) => Self::DELTA, DatabaseOptions::SqlServer(_) => Self::SQL_SERVER, @@ -144,7 +144,9 @@ impl TryFrom for DatabaseOptions { DatabaseOptions::BigQuery(v.try_into()?) } options::database_options::Options::Mysql(v) => DatabaseOptions::Mysql(v.try_into()?), - options::database_options::Options::Mongo(v) => DatabaseOptions::Mongo(v.try_into()?), + options::database_options::Options::Mongodb(v) => { + DatabaseOptions::MongoDb(v.try_into()?) + } options::database_options::Options::Snowflake(v) => { DatabaseOptions::Snowflake(v.try_into()?) } @@ -174,7 +176,7 @@ impl From for options::database_options::Options { DatabaseOptions::Postgres(v) => options::database_options::Options::Postgres(v.into()), DatabaseOptions::BigQuery(v) => options::database_options::Options::Bigquery(v.into()), DatabaseOptions::Mysql(v) => options::database_options::Options::Mysql(v.into()), - DatabaseOptions::Mongo(v) => options::database_options::Options::Mongo(v.into()), + DatabaseOptions::MongoDb(v) => options::database_options::Options::Mongodb(v.into()), DatabaseOptions::Snowflake(v) => { options::database_options::Options::Snowflake(v.into()) } @@ -299,22 +301,22 @@ impl From for options::DatabaseOptionsMysql { } #[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)] -pub struct DatabaseOptionsMongo { +pub struct DatabaseOptionsMongoDb { pub connection_string: String, } -impl TryFrom for DatabaseOptionsMongo { +impl TryFrom for DatabaseOptionsMongoDb { type Error = ProtoConvError; - fn try_from(value: options::DatabaseOptionsMongo) -> Result { - Ok(DatabaseOptionsMongo { + fn try_from(value: options::DatabaseOptionsMongoDb) -> Result { + Ok(DatabaseOptionsMongoDb { connection_string: value.connection_string, }) } } -impl From for options::DatabaseOptionsMongo { - fn from(value: DatabaseOptionsMongo) -> Self { - options::DatabaseOptionsMongo { +impl From for options::DatabaseOptionsMongoDb { + fn from(value: DatabaseOptionsMongoDb) -> Self { + options::DatabaseOptionsMongoDb { connection_string: value.connection_string, } } @@ -538,7 +540,7 @@ pub enum TableOptions { Local(TableOptionsLocal), Gcs(TableOptionsGcs), S3(TableOptionsS3), - Mongo(TableOptionsMongo), + MongoDb(TableOptionsMongoDb), Snowflake(TableOptionsSnowflake), Delta(TableOptionsObjectStore), Iceberg(TableOptionsObjectStore), @@ -558,7 +560,7 @@ impl TableOptions { pub const LOCAL: &'static str = "local"; pub const GCS: &'static str = "gcs"; pub const S3_STORAGE: &'static str = "s3"; - pub const MONGO: &'static str = "mongo"; + pub const MONGODB: &'static str = "mongo"; pub const SNOWFLAKE: &'static str = "snowflake"; pub const DELTA: &'static str = "delta"; pub const ICEBERG: &'static str = "iceberg"; @@ -582,7 +584,7 @@ impl TableOptions { TableOptions::Local(_) => Self::LOCAL, TableOptions::Gcs(_) => Self::GCS, TableOptions::S3(_) => Self::S3_STORAGE, - TableOptions::Mongo(_) => Self::MONGO, + TableOptions::MongoDb(_) => Self::MONGODB, TableOptions::Snowflake(_) => Self::SNOWFLAKE, TableOptions::Delta(_) => Self::DELTA, TableOptions::Iceberg(_) => Self::ICEBERG, @@ -613,7 +615,7 @@ impl TryFrom for TableOptions { options::table_options::Options::Local(v) => TableOptions::Local(v.try_into()?), options::table_options::Options::Gcs(v) => TableOptions::Gcs(v.try_into()?), options::table_options::Options::S3(v) => TableOptions::S3(v.try_into()?), - options::table_options::Options::Mongo(v) => TableOptions::Mongo(v.try_into()?), + options::table_options::Options::Mongo(v) => TableOptions::MongoDb(v.try_into()?), options::table_options::Options::Snowflake(v) => TableOptions::Snowflake(v.try_into()?), options::table_options::Options::Delta(v) => TableOptions::Delta(v.try_into()?), options::table_options::Options::Iceberg(v) => TableOptions::Iceberg(v.try_into()?), @@ -647,7 +649,7 @@ impl TryFrom for options::table_options::Options { TableOptions::Local(v) => options::table_options::Options::Local(v.into()), TableOptions::Gcs(v) => options::table_options::Options::Gcs(v.into()), TableOptions::S3(v) => options::table_options::Options::S3(v.into()), - TableOptions::Mongo(v) => options::table_options::Options::Mongo(v.into()), + TableOptions::MongoDb(v) => options::table_options::Options::Mongo(v.into()), TableOptions::Snowflake(v) => options::table_options::Options::Snowflake(v.into()), TableOptions::Delta(v) => options::table_options::Options::Delta(v.into()), TableOptions::Iceberg(v) => options::table_options::Options::Iceberg(v.into()), @@ -943,16 +945,16 @@ impl From for options::TableOptionsS3 { } } #[derive(Debug, Clone, Arbitrary, PartialEq, Eq, Hash)] -pub struct TableOptionsMongo { +pub struct TableOptionsMongoDb { pub connection_string: String, pub database: String, pub collection: String, } -impl TryFrom for TableOptionsMongo { +impl TryFrom for TableOptionsMongoDb { type Error = ProtoConvError; fn try_from(value: options::TableOptionsMongo) -> Result { - Ok(TableOptionsMongo { + Ok(TableOptionsMongoDb { connection_string: value.connection_string, database: value.database, collection: value.collection, @@ -960,8 +962,8 @@ impl TryFrom for TableOptionsMongo { } } -impl From for options::TableOptionsMongo { - fn from(value: TableOptionsMongo) -> Self { +impl From for options::TableOptionsMongo { + fn from(value: TableOptionsMongoDb) -> Self { options::TableOptionsMongo { connection_string: value.connection_string, database: value.database, diff --git a/crates/sqlbuiltins/src/functions/table/mod.rs b/crates/sqlbuiltins/src/functions/table/mod.rs index 37a1cb099..156245581 100644 --- a/crates/sqlbuiltins/src/functions/table/mod.rs +++ b/crates/sqlbuiltins/src/functions/table/mod.rs @@ -7,7 +7,7 @@ mod excel; mod generate_series; mod iceberg; mod lance; -mod mongo; +mod mongodb; mod mysql; mod object_store; mod postgres; @@ -37,7 +37,7 @@ use self::excel::ExcelScan; use self::generate_series::GenerateSeries; use self::iceberg::{data_files::IcebergDataFiles, scan::IcebergScan, snapshots::IcebergSnapshots}; use self::lance::LanceScan; -use self::mongo::ReadMongoDb; +use self::mongodb::ReadMongoDb; use self::mysql::ReadMysql; use self::object_store::{CSV_SCAN, JSON_SCAN, PARQUET_SCAN, READ_CSV, READ_JSON, READ_PARQUET}; use self::postgres::ReadPostgres; diff --git a/crates/sqlbuiltins/src/functions/table/mongo.rs b/crates/sqlbuiltins/src/functions/table/mongodb.rs similarity index 92% rename from crates/sqlbuiltins/src/functions/table/mongo.rs rename to crates/sqlbuiltins/src/functions/table/mongodb.rs index 43f230cde..53ad0e9cd 100644 --- a/crates/sqlbuiltins/src/functions/table/mongo.rs +++ b/crates/sqlbuiltins/src/functions/table/mongodb.rs @@ -7,7 +7,7 @@ use datafusion::datasource::TableProvider; use datafusion::logical_expr::{Signature, Volatility}; use datafusion_ext::errors::{ExtensionError, Result}; use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; -use datasources::mongodb::{MongoAccessor, MongoTableAccessInfo}; +use datasources::mongodb::{MongoDbAccessor, MongoDbTableAccessInfo}; use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; use super::TableFunc; @@ -55,11 +55,11 @@ impl TableFunc for ReadMongoDb { let database: String = args.next().unwrap().try_into()?; let collection: String = args.next().unwrap().try_into()?; - let access = MongoAccessor::connect(&conn_str) + let access = MongoDbAccessor::connect(&conn_str) .await .map_err(|e| ExtensionError::Access(Box::new(e)))?; let prov = access - .into_table_accessor(MongoTableAccessInfo { + .into_table_accessor(MongoDbTableAccessInfo { database, collection, }) diff --git a/crates/sqlbuiltins/src/functions/table/virtual_listing.rs b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs index 891e067dc..1d151ee08 100644 --- a/crates/sqlbuiltins/src/functions/table/virtual_listing.rs +++ b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs @@ -13,14 +13,14 @@ use datafusion_ext::functions::{ }; use datasources::bigquery::BigQueryAccessor; use datasources::debug::DebugVirtualLister; -use datasources::mongodb::MongoAccessor; +use datasources::mongodb::MongoDbAccessor; use datasources::mysql::MysqlAccessor; use datasources::postgres::PostgresAccess; use datasources::snowflake::{SnowflakeAccessor, SnowflakeDbConnection}; use datasources::sqlserver::SqlServerAccess; use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; use protogen::metastore::types::options::{ - DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsMongo, DatabaseOptionsMysql, + DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsMongoDb, DatabaseOptionsMysql, DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, }; @@ -303,8 +303,8 @@ pub(crate) async fn get_virtual_lister_for_external_db( .map_err(|e| ExtensionError::Access(Box::new(e)))?; Box::new(accessor) } - DatabaseOptions::Mongo(DatabaseOptionsMongo { connection_string }) => { - let accessor = MongoAccessor::connect(connection_string) + DatabaseOptions::MongoDb(DatabaseOptionsMongoDb { connection_string }) => { + let accessor = MongoDbAccessor::connect(connection_string) .await .map_err(|e| ExtensionError::Access(Box::new(e)))?; Box::new(accessor) diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index 8ac3d15ae..edd393e93 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -19,7 +19,7 @@ use datasources::debug::DebugTableType; use datasources::lake::delta::access::{load_table_direct, DeltaLakeAccessor}; use datasources::lake::iceberg::table::IcebergTable; use datasources::lance::scan_lance_table; -use datasources::mongodb::{MongoAccessor, MongoTableAccessInfo}; +use datasources::mongodb::{MongoDbAccessor, MongoDbTableAccessInfo}; use datasources::mysql::{MysqlAccessor, MysqlTableAccess}; use datasources::object_store::gcs::GcsStoreAccess; use datasources::object_store::generic::GenericStoreAccess; @@ -34,12 +34,12 @@ use datasources::sqlserver::{ use protogen::metastore::types::catalog::{CatalogEntry, DatabaseEntry, FunctionEntry, TableEntry}; use protogen::metastore::types::options::{ DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, DatabaseOptionsDebug, - DatabaseOptionsDeltaLake, DatabaseOptionsMongo, DatabaseOptionsMysql, DatabaseOptionsPostgres, - DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, TableOptions, TableOptionsBigQuery, - TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsInternal, - TableOptionsLocal, TableOptionsMongo, TableOptionsMysql, TableOptionsObjectStore, - TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, - TunnelOptions, + DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql, + DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, TableOptions, + TableOptionsBigQuery, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, + TableOptionsInternal, TableOptionsLocal, TableOptionsMongoDb, TableOptionsMysql, + TableOptionsObjectStore, TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, + TableOptionsSqlServer, TunnelOptions, }; use sqlbuiltins::builtins::DEFAULT_CATALOG; use sqlbuiltins::functions::FUNCTION_REGISTRY; @@ -158,12 +158,12 @@ impl<'a> ExternalDispatcher<'a> { let provider = accessor.into_table_provider(table_access, true).await?; Ok(Arc::new(provider)) } - DatabaseOptions::Mongo(DatabaseOptionsMongo { connection_string }) => { - let table_info = MongoTableAccessInfo { + DatabaseOptions::MongoDb(DatabaseOptionsMongoDb { connection_string }) => { + let table_info = MongoDbTableAccessInfo { database: schema.to_string(), // A mongodb database is pretty much a schema. collection: name.to_string(), }; - let accessor = MongoAccessor::connect(connection_string).await?; + let accessor = MongoDbAccessor::connect(connection_string).await?; let table_accessor = accessor.into_table_accessor(table_info); let provider = table_accessor.into_table_provider().await?; Ok(Arc::new(provider)) @@ -286,16 +286,16 @@ impl<'a> ExternalDispatcher<'a> { let provider = accessor.into_table_provider(table_access, true).await?; Ok(Arc::new(provider)) } - TableOptions::Mongo(TableOptionsMongo { + TableOptions::MongoDb(TableOptionsMongoDb { connection_string, database, collection, }) => { - let table_info = MongoTableAccessInfo { + let table_info = MongoDbTableAccessInfo { database: database.to_string(), collection: collection.to_string(), }; - let accessor = MongoAccessor::connect(connection_string).await?; + let accessor = MongoDbAccessor::connect(connection_string).await?; let table_accessor = accessor.into_table_accessor(table_info); let provider = table_accessor.into_table_provider().await?; Ok(Arc::new(provider)) diff --git a/crates/sqlexec/src/dispatch/mod.rs b/crates/sqlexec/src/dispatch/mod.rs index e8527134c..43e6dad77 100644 --- a/crates/sqlexec/src/dispatch/mod.rs +++ b/crates/sqlexec/src/dispatch/mod.rs @@ -76,7 +76,7 @@ pub enum DispatchError { #[error(transparent)] ObjectStoreDatasource(#[from] datasources::object_store::errors::ObjectStoreSourceError), #[error(transparent)] - MongoDatasource(#[from] datasources::mongodb::errors::MongoError), + MongoDatasource(#[from] datasources::mongodb::errors::MongoDbError), #[error(transparent)] SnowflakeDatasource(#[from] datasources::snowflake::errors::DatasourceSnowflakeError), #[error(transparent)] diff --git a/crates/sqlexec/src/parser/options.rs b/crates/sqlexec/src/parser/options.rs index fa778f23e..80d035192 100644 --- a/crates/sqlexec/src/parser/options.rs +++ b/crates/sqlexec/src/parser/options.rs @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, fmt}; use datafusion::common::parsers::CompressionTypeVariant; use datafusion::common::FileType; use datafusion::sql::sqlparser::parser::ParserError; -use datasources::{debug::DebugTableType, mongodb::MongoProtocol}; +use datasources::{debug::DebugTableType, mongodb::MongoDbProtocol}; use protogen::metastore::types::options::StorageOptions; /// Contains the value parsed from Options(...). @@ -137,13 +137,13 @@ impl ParseOptionValue for OptionValue { } } -impl ParseOptionValue for OptionValue { - fn parse_opt(self) -> Result { +impl ParseOptionValue for OptionValue { + fn parse_opt(self) -> Result { let opt = match self { Self::QuotedLiteral(s) | Self::UnquotedLiteral(s) => { s.parse().map_err(|e| parser_err!("{e}"))? } - o => return Err(unexpected_type_err!("mongo protocol", o)), + o => return Err(unexpected_type_err!("mongodb protocol", o)), }; Ok(opt) } diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index e0c13f16a..035c35a18 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -21,7 +21,7 @@ use datasources::debug::DebugTableType; use datasources::lake::delta::access::{load_table_direct, DeltaLakeAccessor}; use datasources::lake::iceberg::table::IcebergTable; use datasources::lance::scan_lance_table; -use datasources::mongodb::{MongoAccessor, MongoDbConnection}; +use datasources::mongodb::{MongoDbAccessor, MongoDbConnection}; use datasources::mysql::{MysqlAccessor, MysqlDbConnection, MysqlTableAccess}; use datasources::object_store::gcs::GcsStoreAccess; use datasources::object_store::generic::GenericStoreAccess; @@ -43,11 +43,11 @@ use protogen::metastore::types::options::{ CopyToFormatOptionsCsv, CopyToFormatOptionsJson, CopyToFormatOptionsParquet, CredentialsOptions, CredentialsOptionsAws, CredentialsOptionsAzure, CredentialsOptionsDebug, CredentialsOptionsGcp, DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsClickhouse, - DatabaseOptionsDebug, DatabaseOptionsDeltaLake, DatabaseOptionsMongo, DatabaseOptionsMysql, + DatabaseOptionsDebug, DatabaseOptionsDeltaLake, DatabaseOptionsMongoDb, DatabaseOptionsMysql, DatabaseOptionsPostgres, DatabaseOptionsSnowflake, DatabaseOptionsSqlServer, DeltaLakeCatalog, DeltaLakeUnityCatalog, StorageOptions, TableOptions, TableOptionsBigQuery, TableOptionsClickhouse, TableOptionsDebug, TableOptionsGcs, TableOptionsLocal, - TableOptionsMongo, TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres, + TableOptionsMongoDb, TableOptionsMysql, TableOptionsObjectStore, TableOptionsPostgres, TableOptionsS3, TableOptionsSnowflake, TableOptionsSqlServer, TunnelOptions, TunnelOptionsDebug, TunnelOptionsInternal, TunnelOptionsSsh, }; @@ -233,15 +233,15 @@ impl<'a> SessionPlanner<'a> { })?; DatabaseOptions::Mysql(DatabaseOptionsMysql { connection_string }) } - DatabaseOptions::MONGO => { - let connection_string = get_mongo_conn_str(m)?; + DatabaseOptions::MONGODB => { + let connection_string = get_mongodb_conn_str(m)?; // Validate the accessor - MongoAccessor::validate_external_database(connection_string.as_str()) + MongoDbAccessor::validate_external_database(connection_string.as_str()) .await .map_err(|e| PlanError::InvalidExternalDatabase { source: Box::new(e), })?; - DatabaseOptions::Mongo(DatabaseOptionsMongo { connection_string }) + DatabaseOptions::MongoDb(DatabaseOptionsMongoDb { connection_string }) } DatabaseOptions::SNOWFLAKE => { let account_name: String = m.remove_required("account")?; @@ -439,14 +439,14 @@ impl<'a> SessionPlanner<'a> { table: access.name, }) } - TableOptions::MONGO => { - let connection_string = get_mongo_conn_str(m)?; + TableOptions::MONGODB => { + let connection_string = get_mongodb_conn_str(m)?; let database = m.remove_required("database")?; let collection = m.remove_required("collection")?; // TODO: Validate - TableOptions::Mongo(TableOptionsMongo { + TableOptions::MongoDb(TableOptionsMongoDb { connection_string, database, collection, @@ -2095,7 +2095,7 @@ fn get_mysql_conn_str(m: &mut StmtOptions) -> Result { Ok(conn.connection_string()) } -fn get_mongo_conn_str(m: &mut StmtOptions) -> Result { +fn get_mongodb_conn_str(m: &mut StmtOptions) -> Result { let conn = match m.remove_optional("connection_string")? { Some(conn_str) => MongoDbConnection::ConnectionString(conn_str), None => { diff --git a/scripts/create-test-mongo-db.sh b/scripts/create-test-mongo-db.sh index f8ccd9aa3..80d104a00 100755 --- a/scripts/create-test-mongo-db.sh +++ b/scripts/create-test-mongo-db.sh @@ -15,7 +15,7 @@ if [[ -n "$(docker ps -a -q -f name=$CONTAINER_NAME)" ]]; then docker rm -f $CONTAINER_NAME > /dev/null fi -# Start mongo. +# Start mongod. CONTAINER_ID="$(docker run \ -p 27017:27017 \ --rm \ @@ -40,7 +40,7 @@ docker exec $CONTAINER_ID mongoimport \ "mongodb://localhost:27017/${DB_NAME}" \ /tmp/bikeshare_stations.csv 1>&2 -# The mongo docker container is kinda bad. The MONGO_INITDB_... environment vars +# The mongod docker container is kinda bad. The MONGO_INITDB_... environment vars # might look like the obvious solution, but they don't work as you would expect. # # See https://github.com/docker-library/mongo/issues/329