From e3408229c8540a26ddcecdeecdd0d9373a0c1def Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 19 Feb 2022 23:30:58 +0000 Subject: [PATCH] Added example --- examples/io_odbc.rs | 82 +++++++++++++++++++++++++++++++++++++ src/io/odbc/mod.rs | 6 +++ src/io/odbc/write/mod.rs | 46 +++++++++++++++++++-- src/io/odbc/write/schema.rs | 2 +- 4 files changed, 131 insertions(+), 5 deletions(-) create mode 100644 examples/io_odbc.rs diff --git a/examples/io_odbc.rs b/examples/io_odbc.rs new file mode 100644 index 00000000000..996fa7130c6 --- /dev/null +++ b/examples/io_odbc.rs @@ -0,0 +1,82 @@ +//! Example showing how to write to, and read from, an ODBC connector +//! +//! On an Ubuntu, you need to run the following (to install the driver): +//! ```bash +//! sudo apt install libsqliteodbc sqlite3 unixodbc-dev +//! sudo sed --in-place 's/libsqlite3odbc.so/\/usr\/lib\/x86_64-linux-gnu\/odbc\/libsqlite3odbc.so/' /etc/odbcinst.ini +//! ``` +use arrow2::array::{Array, Int32Array, Utf8Array}; +use arrow2::chunk::Chunk; +use arrow2::datatypes::{DataType, Field}; +use arrow2::error::Result; +use arrow2::io::odbc::api; +use arrow2::io::odbc::api::Cursor; +use arrow2::io::odbc::read; +use arrow2::io::odbc::write; + +fn main() -> Result<()> { + let connector = "Driver={SQLite3};Database=sqlite-test.db"; + let env = api::Environment::new()?; + let connection = env.connect_with_connection_string(connector)?; + + // let's create an empty table with a schema + connection.execute("DROP TABLE IF EXISTS example;", ())?; + connection.execute("CREATE TABLE example (c1 INT, c2 TEXT);", ())?; + + // and now let's write some data into it (from arrow arrays!) + // first, we prepare the statement + let query = "INSERT INTO example (c1, c2) VALUES (?, ?)"; + let prepared = connection.prepare(query).unwrap(); + + // first, initialize buffers from odbc-api + let fields = vec![ + // (for now) the types here must match the tables' schema + Field::new("unused", DataType::Int32, true), + Field::new("unused", DataType::LargeUtf8, true), + ]; + + let mut writer = write::Writer::try_new(prepared, fields)?; + + // say we have (or receive from a channel) a chunk: + let chunk = Chunk::new(vec![ + Box::new(Int32Array::from_slice([1, 2, 3])) as Box, + Box::new(Utf8Array::::from([Some("Hello"), None, Some("World")])), + ]); + + // we write it like this + writer.write(&chunk)?; + + // and we can later read from it + let chunks = read(&connection, "SELECT c1 FROM example")?; + + // and the result should be the same + assert_eq!(chunks[0].columns()[0], chunk.columns()[0]); + + Ok(()) +} + +/// Reads chunks from a query done against an ODBC connection +pub fn read(connection: &api::Connection<'_>, query: &str) -> Result>>> { + let mut a = connection.prepare(query)?; + let fields = read::infer_schema(&a)?; + + let max_batch_size = 100; + let buffer = read::buffer_from_metadata(&a, max_batch_size)?; + + let cursor = a.execute(())?.unwrap(); + let mut cursor = cursor.bind_buffer(buffer)?; + + let mut chunks = vec![]; + while let Some(batch) = cursor.fetch()? { + let arrays = (0..batch.num_cols()) + .zip(fields.iter()) + .map(|(index, field)| { + let column_view = batch.column(index); + read::deserialize(column_view, field.data_type.clone()) + }) + .collect::>(); + chunks.push(Chunk::new(arrays)); + } + + Ok(chunks) +} diff --git a/src/io/odbc/mod.rs b/src/io/odbc/mod.rs index 1407cf1e3d9..d681ed3c2fd 100644 --- a/src/io/odbc/mod.rs +++ b/src/io/odbc/mod.rs @@ -3,3 +3,9 @@ pub use odbc_api as api; pub mod read; pub mod write; + +impl From for crate::error::ArrowError { + fn from(error: api::Error) -> Self { + crate::error::ArrowError::External("".to_string(), Box::new(error)) + } +} diff --git a/src/io/odbc/write/mod.rs b/src/io/odbc/write/mod.rs index 7f2e5e4a5c3..18d2ebdbdaf 100644 --- a/src/io/odbc/write/mod.rs +++ b/src/io/odbc/write/mod.rs @@ -2,6 +2,8 @@ mod schema; mod serialize; +use crate::{array::Array, chunk::Chunk, datatypes::Field, error::Result}; + use super::api; pub use schema::infer_descriptions; pub use serialize::serialize; @@ -9,7 +11,7 @@ pub use serialize::serialize; /// Creates a [`api::buffers::ColumnarBuffer`] from [`api::ColumnDescription`]s. pub fn buffer_from_description( descriptions: Vec, - max_batch_size: usize, + capacity: usize, ) -> api::buffers::ColumnarBuffer { let descs = descriptions .into_iter() @@ -18,7 +20,43 @@ pub fn buffer_from_description( kind: api::buffers::BufferKind::from_data_type(description.data_type).unwrap(), }); - let mut buffer = api::buffers::buffer_from_description(max_batch_size, descs); - buffer.set_num_rows(max_batch_size); - buffer + api::buffers::buffer_from_description(capacity, descs) +} + +/// A writer of [`Chunk`] to an ODBC prepared statement. +pub struct Writer<'a> { + fields: Vec, + buffer: api::buffers::ColumnarBuffer, + prepared: api::Prepared<'a>, +} + +impl<'a> Writer<'a> { + /// Creates a new [`Writer`] + pub fn try_new(prepared: api::Prepared<'a>, fields: Vec) -> Result { + let buffer = buffer_from_description(infer_descriptions(&fields)?, 0); + Ok(Self { + fields, + buffer, + prepared, + }) + } + + /// Writes a chunk to the writter. + pub fn write>(&mut self, chunk: &Chunk) -> Result<()> { + if chunk.len() > self.buffer.num_rows() { + // if the chunk is larger, we re-allocate new buffers to hold it + self.buffer = buffer_from_description(infer_descriptions(&self.fields)?, chunk.len()); + } + + self.buffer.set_num_rows(chunk.len()); + + // serialize (CPU-bounded) + for (i, column) in chunk.arrays().iter().enumerate() { + serialize(column.as_ref(), &mut self.buffer.column_mut(i))?; + } + + // write (IO-bounded) + self.prepared.execute(&self.buffer)?; + Ok(()) + } } diff --git a/src/io/odbc/write/schema.rs b/src/io/odbc/write/schema.rs index 46bf1d0e900..546c6d2b427 100644 --- a/src/io/odbc/write/schema.rs +++ b/src/io/odbc/write/schema.rs @@ -32,7 +32,7 @@ fn data_type_to(data_type: &DataType) -> Result { DataType::Float64 => api::DataType::Float { precision: 53 }, DataType::FixedSizeBinary(length) => api::DataType::Binary { length: *length }, DataType::Binary | DataType::LargeBinary => api::DataType::Varbinary { length: 0 }, - DataType::Utf8 | DataType::LargeUtf8 => api::DataType::LongVarchar { length: 0 }, + DataType::Utf8 | DataType::LargeUtf8 => api::DataType::Varchar { length: 0 }, other => return Err(ArrowError::nyi(format!("{other:?} to ODBC"))), }) }