Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 5, 2024
1 parent eea7ec6 commit 6c3d08c
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 46 deletions.
12 changes: 11 additions & 1 deletion integrations/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ homepage = "https://opendal.apache.org/"
license = "Apache-2.0"
repository = "https://github.com/apache/opendal"
rust-version = "1.75"
version = "0.45.0"
version = "0.0.1"

[features]
default = ["arrow"]
arrow = ["dep:arrow"]

[dependencies]
async-trait = "0.1"
Expand All @@ -36,6 +40,7 @@ parquet = { version = "52.0", default-features = false, features = [
"async",
"arrow",
] }
arrow = { version = "52.0", optional = true }

[dev-dependencies]
opendal = { version = "0.48.0", path = "../../core", features = [
Expand All @@ -44,3 +49,8 @@ opendal = { version = "0.48.0", path = "../../core", features = [
] }
rand = "0.8.5"
tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] }

[[example]]
name = "async_writer"
path = "examples/async_writer.rs"
required-features = ["arrow"]
45 changes: 45 additions & 0 deletions integrations/parquet/examples/async_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array, RecordBatch};

use opendal::{services::S3Config, Operator};
use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter};
use parquet_opendal::AsyncWriter;

#[tokio::main]
async fn main() {
let mut cfg = S3Config::default();
cfg.access_key_id = Some("my_access_key".to_string());
cfg.secret_access_key = Some("my_secret_key".to_string());
cfg.endpoint = Some("my_endpoint".to_string());
cfg.region = Some("my_region".to_string());
cfg.bucket = "my_bucket".to_string();

// Create a new operator
let operator = Operator::from_config(cfg).unwrap().finish();
let path = "/path/to/file.parquet";

// Create an async writer
let writer = AsyncWriter::new(
operator
.writer_with(path)
.chunk(32 * 1024 * 1024)
.concurrent(8)
.await
.unwrap(),
);

let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();

let buffer = operator.read(path).await.unwrap().to_bytes();
let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
.unwrap()
.build()
.unwrap();
let read = reader.next().unwrap().unwrap();
assert_eq!(to_write, read);
}
71 changes: 46 additions & 25 deletions integrations/parquet/src/async_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,67 @@ use parquet::errors::{ParquetError, Result};
use futures::future::BoxFuture;
use opendal::Writer;

/// OpendalAsyncWriter implements AsyncFileWriter trait by using opendal.
/// AsyncWriter implements AsyncFileWriter trait by using opendal.
///
/// ```no_run
/// use parquet::arrow::async_writer::AsyncFileWriter;
/// use parquet::OpendalAsyncWriter;
/// use opendal::services::S3;
/// use opendal::{Builder, Operator};
///
/// use std::sync::Arc;
///
/// use arrow::array::{ArrayRef, Int64Array, RecordBatch};
///
/// use opendal::{services::S3Config, Operator};
/// use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter};
/// use parquet_opendal::AsyncWriter;
///
/// #[tokio::main]
/// async fn main() {
/// let builder = S3::from_map(
/// vec![
/// ("access_key".to_string(), "my_access_key".to_string()),
/// ("secret_key".to_string(), "my_secret_key".to_string()),
/// ("endpoint".to_string(), "my_endpoint".to_string()),
/// ("region".to_string(), "my_region".to_string()),
/// ]
/// .into_iter()
/// .collect(),
/// ).unwrap();
///
/// let mut cfg = S3Config::default();
/// cfg.access_key_id = Some("my_access_key".to_string());
/// cfg.secret_access_key = Some("my_secret_key".to_string());
/// cfg.endpoint = Some("my_endpoint".to_string());
/// cfg.region = Some("my_region".to_string());
/// cfg.bucket = "my_bucket".to_string();
///
/// // Create a new operator
/// let operator = Operator::new(builder).unwrap().finish();
/// let operator = Operator::from_config(cfg).unwrap().finish();
/// let path = "/path/to/file.parquet";
/// // Create a new object store
/// let mut writer = Arc::new(OpendalAsyncWriter::new(operator.writer(path)));
///
/// // Create an async writer
/// let writer = AsyncWriter::new(
/// operator
/// .writer_with(path)
/// .chunk(32 * 1024 * 1024)
/// .concurrent(8)
/// .await
/// .unwrap(),
/// );
///
/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
/// let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
/// writer.write(&to_write).await.unwrap();
/// writer.close().await.unwrap();
///
/// let buffer = operator.read(path).await.unwrap().to_bytes();
/// let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
/// .unwrap()
/// .build()
/// .unwrap();
/// let read = reader.next().unwrap().unwrap();
/// assert_eq!(to_write, read);
/// }
/// ```
pub struct OpendalAsyncWriter {
pub struct AsyncWriter {
inner: Writer,
}

impl OpendalAsyncWriter {
impl AsyncWriter {
/// Create a [`OpendalAsyncWriter`] by given [`Writer`].
pub fn new(writer: Writer) -> Self {
Self { inner: writer }
}
}

impl AsyncFileWriter for OpendalAsyncWriter {
impl AsyncFileWriter for AsyncWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
self.inner
Expand Down Expand Up @@ -90,7 +111,7 @@ mod tests {
async fn test_basic() {
let op = Operator::new(services::Memory::default()).unwrap().finish();
let path = "data/test.txt";
let mut writer = OpendalAsyncWriter::new(op.writer(path).await.unwrap());
let mut writer = AsyncWriter::new(op.writer(path).await.unwrap());
let bytes = Bytes::from_static(b"hello, world!");
writer.write(bytes).await.unwrap();
let bytes = Bytes::from_static(b"hello, OpenDAL!");
Expand All @@ -105,7 +126,7 @@ mod tests {
async fn test_abort() {
let op = Operator::new(services::Memory::default()).unwrap().finish();
let path = "data/test.txt";
let mut writer = OpendalAsyncWriter::new(op.writer(path).await.unwrap());
let mut writer = AsyncWriter::new(op.writer(path).await.unwrap());
let bytes = Bytes::from_static(b"hello, world!");
writer.write(bytes).await.unwrap();
let bytes = Bytes::from_static(b"hello, OpenDAL!");
Expand Down
63 changes: 43 additions & 20 deletions integrations/parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,56 @@

//! parquet_opendal provides parquet IO utils.
//!
//! ```no_run
//! use parquet::arrow::async_writer::AsyncFileWriter;
//! use parquet::OpendalAsyncWriter;
//! use opendal::services::S3;
//! use opendal::{Builder, Operator};
//! AsyncWriter implements AsyncFileWriter trait by using opendal.
//!
//! ```no_run
//! use std::sync::Arc;
//!
//! use arrow::array::{ArrayRef, Int64Array, RecordBatch};
//!
//! use opendal::{services::S3Config, Operator};
//! use parquet::arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, AsyncArrowWriter};
//! use parquet_opendal::AsyncWriter;
//!
//! #[tokio::main]
//! async fn main() {
//! let builder = S3::from_map(
//! vec![
//! ("access_key".to_string(), "my_access_key".to_string()),
//! ("secret_key".to_string(), "my_secret_key".to_string()),
//! ("endpoint".to_string(), "my_endpoint".to_string()),
//! ("region".to_string(), "my_region".to_string()),
//! ]
//! .into_iter()
//! .collect(),
//! ).unwrap();
//!
//! let mut cfg = S3Config::default();
//! cfg.access_key_id = Some("my_access_key".to_string());
//! cfg.secret_access_key = Some("my_secret_key".to_string());
//! cfg.endpoint = Some("my_endpoint".to_string());
//! cfg.region = Some("my_region".to_string());
//! cfg.bucket = "my_bucket".to_string();
//!
//! // Create a new operator
//! let operator = Operator::new(builder).unwrap().finish();
//! let operator = Operator::from_config(cfg).unwrap().finish();
//! let path = "/path/to/file.parquet";
//! // Create a new object store
//! let mut writer = Arc::new(OpendalAsyncWriter::new(operator.writer(path)));
//!
//! // Create an async writer
//! let writer = AsyncWriter::new(
//! operator
//! .writer_with(path)
//! .chunk(32 * 1024 * 1024)
//! .concurrent(8)
//! .await
//! .unwrap(),
//! );
//!
//! let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
//! let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
//! let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
//! writer.write(&to_write).await.unwrap();
//! writer.close().await.unwrap();
//!
//! let buffer = operator.read(path).await.unwrap().to_bytes();
//! let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
//! .unwrap()
//! .build()
//! .unwrap();
//! let read = reader.next().unwrap().unwrap();
//! assert_eq!(to_write, read);
//! }
//! ```
mod async_writer;

pub use async_writer::OpendalAsyncWriter;
pub use async_writer::AsyncWriter;

0 comments on commit 6c3d08c

Please sign in to comment.