Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added example
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 19, 2022
1 parent 606afd3 commit e340822
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 5 deletions.
82 changes: 82 additions & 0 deletions examples/io_odbc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! Example showing how to write to, and read from, an ODBC connector
//!
//! On an Ubuntu, you need to run the following (to install the driver):
//! ```bash
//! sudo apt install libsqliteodbc sqlite3 unixodbc-dev
//! sudo sed --in-place 's/libsqlite3odbc.so/\/usr\/lib\/x86_64-linux-gnu\/odbc\/libsqlite3odbc.so/' /etc/odbcinst.ini
//! ```
use arrow2::array::{Array, Int32Array, Utf8Array};
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field};
use arrow2::error::Result;
use arrow2::io::odbc::api;
use arrow2::io::odbc::api::Cursor;
use arrow2::io::odbc::read;
use arrow2::io::odbc::write;

fn main() -> Result<()> {
let connector = "Driver={SQLite3};Database=sqlite-test.db";
let env = api::Environment::new()?;
let connection = env.connect_with_connection_string(connector)?;

// let's create an empty table with a schema
connection.execute("DROP TABLE IF EXISTS example;", ())?;
connection.execute("CREATE TABLE example (c1 INT, c2 TEXT);", ())?;

// and now let's write some data into it (from arrow arrays!)
// first, we prepare the statement
let query = "INSERT INTO example (c1, c2) VALUES (?, ?)";
let prepared = connection.prepare(query).unwrap();

// first, initialize buffers from odbc-api
let fields = vec![
// (for now) the types here must match the tables' schema
Field::new("unused", DataType::Int32, true),
Field::new("unused", DataType::LargeUtf8, true),
];

let mut writer = write::Writer::try_new(prepared, fields)?;

// say we have (or receive from a channel) a chunk:
let chunk = Chunk::new(vec![
Box::new(Int32Array::from_slice([1, 2, 3])) as Box<dyn Array>,
Box::new(Utf8Array::<i64>::from([Some("Hello"), None, Some("World")])),
]);

// we write it like this
writer.write(&chunk)?;

// and we can later read from it
let chunks = read(&connection, "SELECT c1 FROM example")?;

// and the result should be the same
assert_eq!(chunks[0].columns()[0], chunk.columns()[0]);

Ok(())
}

/// Reads chunks from a query done against an ODBC connection
pub fn read(connection: &api::Connection<'_>, query: &str) -> Result<Vec<Chunk<Box<dyn Array>>>> {
let mut a = connection.prepare(query)?;
let fields = read::infer_schema(&a)?;

let max_batch_size = 100;
let buffer = read::buffer_from_metadata(&a, max_batch_size)?;

let cursor = a.execute(())?.unwrap();
let mut cursor = cursor.bind_buffer(buffer)?;

let mut chunks = vec![];
while let Some(batch) = cursor.fetch()? {
let arrays = (0..batch.num_cols())
.zip(fields.iter())
.map(|(index, field)| {
let column_view = batch.column(index);
read::deserialize(column_view, field.data_type.clone())
})
.collect::<Vec<_>>();
chunks.push(Chunk::new(arrays));
}

Ok(chunks)
}
6 changes: 6 additions & 0 deletions src/io/odbc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ pub use odbc_api as api;

pub mod read;
pub mod write;

impl From<api::Error> for crate::error::ArrowError {
fn from(error: api::Error) -> Self {
crate::error::ArrowError::External("".to_string(), Box::new(error))
}
}
46 changes: 42 additions & 4 deletions src/io/odbc/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
mod schema;
mod serialize;

use crate::{array::Array, chunk::Chunk, datatypes::Field, error::Result};

use super::api;
pub use schema::infer_descriptions;
pub use serialize::serialize;

/// Creates a [`api::buffers::ColumnarBuffer`] from [`api::ColumnDescription`]s.
pub fn buffer_from_description(
descriptions: Vec<api::ColumnDescription>,
max_batch_size: usize,
capacity: usize,
) -> api::buffers::ColumnarBuffer<api::buffers::AnyColumnBuffer> {
let descs = descriptions
.into_iter()
Expand All @@ -18,7 +20,43 @@ pub fn buffer_from_description(
kind: api::buffers::BufferKind::from_data_type(description.data_type).unwrap(),
});

let mut buffer = api::buffers::buffer_from_description(max_batch_size, descs);
buffer.set_num_rows(max_batch_size);
buffer
api::buffers::buffer_from_description(capacity, descs)
}

/// A writer of [`Chunk`] to an ODBC prepared statement.
pub struct Writer<'a> {
fields: Vec<Field>,
buffer: api::buffers::ColumnarBuffer<api::buffers::AnyColumnBuffer>,
prepared: api::Prepared<'a>,
}

impl<'a> Writer<'a> {
/// Creates a new [`Writer`]
pub fn try_new(prepared: api::Prepared<'a>, fields: Vec<Field>) -> Result<Self> {
let buffer = buffer_from_description(infer_descriptions(&fields)?, 0);
Ok(Self {
fields,
buffer,
prepared,
})
}

/// Writes a chunk to the writter.
pub fn write<A: AsRef<dyn Array>>(&mut self, chunk: &Chunk<A>) -> Result<()> {
if chunk.len() > self.buffer.num_rows() {
// if the chunk is larger, we re-allocate new buffers to hold it
self.buffer = buffer_from_description(infer_descriptions(&self.fields)?, chunk.len());
}

self.buffer.set_num_rows(chunk.len());

// serialize (CPU-bounded)
for (i, column) in chunk.arrays().iter().enumerate() {
serialize(column.as_ref(), &mut self.buffer.column_mut(i))?;
}

// write (IO-bounded)
self.prepared.execute(&self.buffer)?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/io/odbc/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn data_type_to(data_type: &DataType) -> Result<api::DataType> {
DataType::Float64 => api::DataType::Float { precision: 53 },
DataType::FixedSizeBinary(length) => api::DataType::Binary { length: *length },
DataType::Binary | DataType::LargeBinary => api::DataType::Varbinary { length: 0 },
DataType::Utf8 | DataType::LargeUtf8 => api::DataType::LongVarchar { length: 0 },
DataType::Utf8 | DataType::LargeUtf8 => api::DataType::Varchar { length: 0 },
other => return Err(ArrowError::nyi(format!("{other:?} to ODBC"))),
})
}

0 comments on commit e340822

Please sign in to comment.