Skip to content

Commit

Permalink
feat(rust/drivers/datafusion): add support for bulk ingest (#2279)
Browse files Browse the repository at this point in the history
- swaps todos from Optionable impls to partial support: none for
Database, CurrentCatalog and CurrentSchema for Connection and
IngestTableTarget for Statement.
- adds support for binding a single batch and executing a bulk insert on
it. I used `RefCell` to pass bound record batch from `bind` to an
`execute_update` method.
  • Loading branch information
tokoko authored Nov 5, 2024
1 parent cd816e6 commit d07b9f0
Show file tree
Hide file tree
Showing 2 changed files with 257 additions and 57 deletions.
245 changes: 190 additions & 55 deletions rust/drivers/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

#![allow(refining_impl_trait)]

use adbc_core::ffi::constants;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::TableType;
use datafusion::prelude::*;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::substrait::proto::Plan;
use prost::Message;
use std::fmt::Debug;
use std::sync::Arc;
use std::vec::IntoIter;
use std::{collections::HashMap, fmt::Debug};
use tokio::runtime::Runtime;

use arrow_array::builder::{
Expand Down Expand Up @@ -113,9 +115,7 @@ impl Driver for DataFusionDriver {
type DatabaseType = DataFusionDatabase;

fn new_database(&mut self) -> Result<Self::DatabaseType> {
Ok(Self::DatabaseType {
options: HashMap::new(),
})
Ok(Self::DatabaseType {})
}

fn new_database_with_opts(
Expand All @@ -127,46 +127,56 @@ impl Driver for DataFusionDriver {
),
>,
) -> adbc_core::error::Result<Self::DatabaseType> {
let mut database = Self::DatabaseType {
options: HashMap::new(),
};
let mut database = Self::DatabaseType {};
for (key, value) in opts {
database.set_option(key, value)?;
}
Ok(database)
}
}

pub struct DataFusionDatabase {
options: HashMap<OptionDatabase, OptionValue>,
}
pub struct DataFusionDatabase {}

impl Optionable for DataFusionDatabase {
type Option = OptionDatabase;

fn set_option(
&mut self,
_key: Self::Option,
key: Self::Option,
_value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
self.options.insert(_key, _value);
Ok(())
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}
}

Expand All @@ -189,7 +199,7 @@ impl Database for DataFusionDatabase {

fn new_connection_with_opts(
&mut self,
_opts: impl IntoIterator<
opts: impl IntoIterator<
Item = (
adbc_core::options::OptionConnection,
adbc_core::options::OptionValue,
Expand All @@ -203,10 +213,16 @@ impl Database for DataFusionDatabase {
.build()
.unwrap();

Ok(DataFusionConnection {
let mut connection = DataFusionConnection {
runtime: Arc::new(runtime),
ctx: Arc::new(ctx),
})
};

for (key, value) in opts {
connection.set_option(key, value)?;
}

Ok(connection)
}
}

Expand All @@ -220,26 +236,85 @@ impl Optionable for DataFusionConnection {

fn set_option(
&mut self,
_key: Self::Option,
_value: adbc_core::options::OptionValue,
key: Self::Option,
value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
todo!()
match key.as_ref() {
constants::ADBC_CONNECTION_OPTION_CURRENT_CATALOG => match value {
OptionValue::String(value) => {
self.runtime.block_on(async {
let query = format!("SET datafusion.catalog.default_catalog = {value}");
self.ctx.sql(query.as_str()).await.unwrap();
});
Ok(())
}
_ => Err(Error::with_message_and_status(
"CurrentCatalog value must be of type String",
Status::InvalidArguments,
)),
},
constants::ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA => match value {
OptionValue::String(value) => {
self.runtime.block_on(async {
let query = format!("SET datafusion.catalog.default_schema = {value}");
self.ctx.sql(query.as_str()).await.unwrap();
});
Ok(())
}
_ => Err(Error::with_message_and_status(
"CurrentSchema value must be of type String",
Status::InvalidArguments,
)),
},
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
)),
}
}

fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
match key.as_ref() {
constants::ADBC_CONNECTION_OPTION_CURRENT_CATALOG => Ok(self
.ctx
.state()
.config_options()
.catalog
.default_catalog
.clone()),
constants::ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA => Ok(self
.ctx
.state()
.config_options()
.catalog
.default_schema
.clone()),
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
)),
}
}

fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}
}

Expand Down Expand Up @@ -622,6 +697,8 @@ impl Connection for DataFusionConnection {
ctx: self.ctx.clone(),
sql_query: None,
substrait_plan: None,
bound_record_batch: None,
ingest_target_table: None,
})
}

Expand Down Expand Up @@ -707,39 +784,81 @@ pub struct DataFusionStatement {
ctx: Arc<SessionContext>,
sql_query: Option<String>,
substrait_plan: Option<Plan>,
bound_record_batch: Option<RecordBatch>,
ingest_target_table: Option<String>,
}

impl Optionable for DataFusionStatement {
type Option = OptionStatement;

fn set_option(
&mut self,
_key: Self::Option,
_value: adbc_core::options::OptionValue,
key: Self::Option,
value: adbc_core::options::OptionValue,
) -> adbc_core::error::Result<()> {
todo!()
match key.as_ref() {
constants::ADBC_INGEST_OPTION_TARGET_TABLE => match value {
OptionValue::String(value) => {
self.ingest_target_table = Some(value);
Ok(())
}
_ => Err(Error::with_message_and_status(
"IngestOptionTargetTable value must be of type String",
Status::InvalidArguments,
)),
},
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
)),
}
}

fn get_option_string(&self, _key: Self::Option) -> adbc_core::error::Result<String> {
todo!()
fn get_option_string(&self, key: Self::Option) -> adbc_core::error::Result<String> {
match key.as_ref() {
constants::ADBC_INGEST_OPTION_TARGET_TABLE => {
let target_table = self.ingest_target_table.clone();
match target_table {
Some(table) => Ok(table),
None => Err(Error::with_message_and_status(
format!("{key:?} has not been set"),
Status::NotFound,
)),
}
}
_ => Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
)),
}
}

fn get_option_bytes(&self, _key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
todo!()
fn get_option_bytes(&self, key: Self::Option) -> adbc_core::error::Result<Vec<u8>> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_int(&self, _key: Self::Option) -> adbc_core::error::Result<i64> {
todo!()
fn get_option_int(&self, key: Self::Option) -> adbc_core::error::Result<i64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}

fn get_option_double(&self, _key: Self::Option) -> adbc_core::error::Result<f64> {
todo!()
fn get_option_double(&self, key: Self::Option) -> adbc_core::error::Result<f64> {
Err(Error::with_message_and_status(
format!("Unrecognized option: {key:?}"),
Status::NotFound,
))
}
}

impl Statement for DataFusionStatement {
fn bind(&mut self, _batch: arrow_array::RecordBatch) -> adbc_core::error::Result<()> {
todo!()
fn bind(&mut self, batch: arrow_array::RecordBatch) -> adbc_core::error::Result<()> {
self.bound_record_batch.replace(batch);
Ok(())
}

fn bind_stream(
Expand Down Expand Up @@ -768,13 +887,29 @@ impl Statement for DataFusionStatement {
}

fn execute_update(&mut self) -> adbc_core::error::Result<Option<i64>> {
self.runtime.block_on(async {
let _ = self
.ctx
.sql(&self.sql_query.clone().unwrap())
.await
.unwrap();
});
if self.sql_query.is_some() {
self.runtime.block_on(async {
let _ = self
.ctx
.sql(&self.sql_query.clone().unwrap())
.await
.unwrap();
});
} else if let Some(batch) = self.bound_record_batch.take() {
self.runtime.block_on(async {
let table = match self.ingest_target_table.clone() {
Some(table) => table,
None => todo!(),
};

self.ctx
.read_batch(batch)
.unwrap()
.write_table(table.as_str(), DataFrameWriteOptions::new())
.await
.unwrap();
});
}

Ok(Some(0))
}
Expand Down
Loading

0 comments on commit d07b9f0

Please sign in to comment.