Skip to content

Commit

Permalink
chore: mongo -> mongodb in symbol naming (#2343)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish authored Jan 4, 2024
1 parent 21ad100 commit 7c19b8b
Show file tree
Hide file tree
Showing 14 changed files with 108 additions and 106 deletions.
6 changes: 3 additions & 3 deletions crates/datasources/src/mongodb/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[derive(Debug, thiserror::Error)]
pub enum MongoError {
pub enum MongoDbError {
#[error("Failed to merge schemas: {0}")]
FailedSchemaMerge(datafusion::arrow::error::ArrowError),

Expand All @@ -19,7 +19,7 @@ pub enum MongoError {
Bson(#[from] crate::bson::errors::BsonError),

#[error(transparent)]
RawBSON(#[from] mongodb::bson::raw::Error),
RawBson(#[from] mongodb::bson::raw::Error),
}

pub type Result<T, E = MongoError> = std::result::Result<T, E>;
pub type Result<T, E = MongoDbError> = std::result::Result<T, E>;
18 changes: 9 additions & 9 deletions crates/datasources/src/mongodb/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,24 @@ use mongodb::Cursor;

use datafusion_ext::metrics::DataSourceMetricsStreamAdapter;

use super::errors::{MongoError, Result};
use super::errors::{MongoDbError, Result};
use crate::bson::builder::RecordStructBuilder;

#[derive(Debug)]
pub struct MongoBsonExec {
pub struct MongoDbBsonExec {
cursor: Mutex<Option<Cursor<RawDocumentBuf>>>,
schema: Arc<ArrowSchema>,
limit: Option<usize>,
metrics: ExecutionPlanMetricsSet,
}

impl MongoBsonExec {
impl MongoDbBsonExec {
pub fn new(
cursor: Mutex<Option<Cursor<RawDocumentBuf>>>,
schema: Arc<ArrowSchema>,
limit: Option<usize>,
) -> MongoBsonExec {
MongoBsonExec {
) -> MongoDbBsonExec {
MongoDbBsonExec {
cursor,
schema,
limit,
Expand All @@ -48,7 +48,7 @@ impl MongoBsonExec {
}
}

impl ExecutionPlan for MongoBsonExec {
impl ExecutionPlan for MongoDbBsonExec {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -74,7 +74,7 @@ impl ExecutionPlan for MongoBsonExec {
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DatafusionResult<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Execution(
"cannot replace children for MongoDBExec".to_string(),
"cannot replace children for MongoDB Exec".to_string(),
))
}

Expand Down Expand Up @@ -114,7 +114,7 @@ impl ExecutionPlan for MongoBsonExec {
}
}

impl DisplayAs for MongoBsonExec {
impl DisplayAs for MongoDbBsonExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MongoBsonExec")
}
Expand Down Expand Up @@ -174,7 +174,7 @@ impl RecordBatchStream for BsonStream {
}
}

fn document_chunk_to_record_batch<E: Into<MongoError>>(
fn document_chunk_to_record_batch<E: Into<MongoDbError>>(
chunk: Vec<Result<RawDocumentBuf, E>>,
fields: Fields,
) -> Result<RecordBatch> {
Expand Down
58 changes: 29 additions & 29 deletions crates/datasources/src/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ mod infer;

use datafusion_ext::errors::ExtensionError;
use datafusion_ext::functions::VirtualLister;
use errors::{MongoError, Result};
use exec::MongoBsonExec;
use errors::{MongoDbError, Result};
use exec::MongoDbBsonExec;
use infer::TableSampler;

use async_trait::async_trait;
Expand Down Expand Up @@ -35,36 +35,36 @@ use tracing::debug;
const ID_FIELD_NAME: &str = "_id";

#[derive(Debug)]
pub enum MongoProtocol {
pub enum MongoDbProtocol {
MongoDb,
MongoDbSrv,
}

impl Default for MongoProtocol {
impl Default for MongoDbProtocol {
fn default() -> Self {
Self::MongoDb
}
}

impl MongoProtocol {
impl MongoDbProtocol {
const MONGODB: &'static str = "mongodb";
const MONGODB_SRV: &'static str = "mongodb+srv";
}

impl FromStr for MongoProtocol {
type Err = MongoError;
impl FromStr for MongoDbProtocol {
type Err = MongoDbError;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let proto = match s {
Self::MONGODB => Self::MongoDb,
Self::MONGODB_SRV => Self::MongoDbSrv,
s => return Err(MongoError::InvalidProtocol(s.to_owned())),
s => return Err(MongoDbError::InvalidProtocol(s.to_owned())),
};
Ok(proto)
}
}

impl Display for MongoProtocol {
impl Display for MongoDbProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::MongoDb => Self::MONGODB,
Expand All @@ -78,7 +78,7 @@ impl Display for MongoProtocol {
pub enum MongoDbConnection {
ConnectionString(String),
Parameters {
protocol: MongoProtocol,
protocol: MongoDbProtocol,
host: String,
port: Option<u16>,
user: String,
Expand Down Expand Up @@ -107,7 +107,7 @@ impl MongoDbConnection {
}
// Address
write!(&mut conn_str, "@{host}").unwrap();
if matches!(protocol, MongoProtocol::MongoDb) {
if matches!(protocol, MongoDbProtocol::MongoDb) {
// Only attempt to write port if the protocol is "mongodb"
if let Some(port) = port {
write!(&mut conn_str, ":{port}").unwrap();
Expand All @@ -120,17 +120,17 @@ impl MongoDbConnection {
}

#[derive(Debug, Clone)]
pub struct MongoAccessor {
pub struct MongoDbAccessor {
client: Client,
}

impl MongoAccessor {
pub async fn connect(connection_string: &str) -> Result<MongoAccessor> {
impl MongoDbAccessor {
pub async fn connect(connection_string: &str) -> Result<MongoDbAccessor> {
let mut opts = ClientOptions::parse(connection_string).await?;
opts.app_name = Some("GlareDB (MongoDB Data source)".to_string());
let client = Client::with_options(opts)?;

Ok(MongoAccessor { client })
Ok(MongoDbAccessor { client })
}

pub async fn validate_external_database(connection_string: &str) -> Result<()> {
Expand All @@ -144,16 +144,16 @@ impl MongoAccessor {
Ok(())
}

pub fn into_table_accessor(self, info: MongoTableAccessInfo) -> MongoTableAccessor {
MongoTableAccessor {
pub fn into_table_accessor(self, info: MongoDbTableAccessInfo) -> MongoDbTableAccessor {
MongoDbTableAccessor {
info,
client: self.client,
}
}
}

#[async_trait]
impl VirtualLister for MongoAccessor {
impl VirtualLister for MongoDbAccessor {
async fn list_schemas(&self) -> Result<Vec<String>, ExtensionError> {
use ExtensionError::ListingErrBoxed;

Expand Down Expand Up @@ -198,18 +198,18 @@ impl VirtualLister for MongoAccessor {
}

#[derive(Debug, Clone)]
pub struct MongoTableAccessInfo {
pub struct MongoDbTableAccessInfo {
pub database: String, // "Schema"
pub collection: String,
}

#[derive(Debug, Clone)]
pub struct MongoTableAccessor {
info: MongoTableAccessInfo,
pub struct MongoDbTableAccessor {
info: MongoDbTableAccessInfo,
client: Client,
}

impl MongoTableAccessor {
impl MongoDbTableAccessor {
/// Validate that we can access the table.
pub async fn validate(&self) -> Result<()> {
let _ = self
Expand All @@ -222,7 +222,7 @@ impl MongoTableAccessor {
Ok(())
}

pub async fn into_table_provider(self) -> Result<MongoTableProvider> {
pub async fn into_table_provider(self) -> Result<MongoDbTableProvider> {
let collection = self
.client
.database(&self.info.database)
Expand All @@ -231,7 +231,7 @@ impl MongoTableAccessor {

let schema = sampler.infer_schema_from_sample().await?;

Ok(MongoTableProvider {
Ok(MongoDbTableProvider {
schema: Arc::new(schema),
collection: self
.client
Expand All @@ -241,13 +241,13 @@ impl MongoTableAccessor {
}
}

pub struct MongoTableProvider {
pub struct MongoDbTableProvider {
schema: Arc<ArrowSchema>,
collection: Collection<RawDocumentBuf>,
}

#[async_trait]
impl TableProvider for MongoTableProvider {
impl TableProvider for MongoDbTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down Expand Up @@ -317,7 +317,7 @@ impl TableProvider for MongoTableProvider {
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?,
));
Ok(Arc::new(MongoBsonExec::new(cursor, schema, limit)))
Ok(Arc::new(MongoDbBsonExec::new(cursor, schema, limit)))
}
}

Expand Down Expand Up @@ -427,7 +427,7 @@ mod tests {
assert_eq!(&conn_str, "mongodb://prod:password123@127.0.0.1:5432");

let conn_str = MongoDbConnection::Parameters {
protocol: MongoProtocol::MongoDb,
protocol: MongoDbProtocol::MongoDb,
host: "127.0.0.1".to_string(),
port: Some(5432),
user: "prod".to_string(),
Expand All @@ -437,7 +437,7 @@ mod tests {
assert_eq!(&conn_str, "mongodb://prod:password123@127.0.0.1:5432");

let conn_str = MongoDbConnection::Parameters {
protocol: MongoProtocol::MongoDbSrv,
protocol: MongoDbProtocol::MongoDbSrv,
host: "127.0.0.1".to_string(),
port: Some(5432),
user: "prod".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ pub fn init_session_registry<'a>(
| TableOptions::Postgres(_)
| TableOptions::BigQuery(_)
| TableOptions::Mysql(_)
| TableOptions::Mongo(_)
| TableOptions::MongoDb(_)
| TableOptions::Snowflake(_)
| TableOptions::SqlServer(_)
| TableOptions::Clickhouse(_) => continue,
Expand Down
4 changes: 2 additions & 2 deletions crates/protogen/proto/metastore/options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ message DatabaseOptions {
DatabaseOptionsPostgres postgres = 3;
DatabaseOptionsBigQuery bigquery = 4;
DatabaseOptionsMysql mysql = 5;
DatabaseOptionsMongo mongo = 6;
DatabaseOptionsMongoDb mongodb = 6;
DatabaseOptionsSnowflake snowflake = 7;
DatabaseOptionsDeltaLake delta = 8;
DatabaseOptionsSqlServer sql_server = 9;
Expand All @@ -68,7 +68,7 @@ message DatabaseOptionsMysql {
string connection_string = 1;
}

message DatabaseOptionsMongo {
message DatabaseOptionsMongoDb {
string connection_string = 1;
}

Expand Down
Loading

0 comments on commit 7c19b8b

Please sign in to comment.