Skip to content

Commit

Permalink
Officially maintained Arrow2 branch (#1556)
Browse files Browse the repository at this point in the history
Migrate to arrow2

Co-authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
Co-authored-by: Yijie Shen <henry.yijieshen@gmail.com>
Co-authored-by: Guillaume Balaine <igosuki@gmail.com>
Co-authored-by: Guillaume Balaine <igosuki.github@gmail.com>
  • Loading branch information
5 people committed Jan 20, 2022
1 parent 2008b1d commit c0c9c72
Show file tree
Hide file tree
Showing 176 changed files with 5,942 additions and 6,944 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,7 @@ jobs:
run: |
cargo miri setup
cargo clean
# Ignore MIRI errors until we can get a clean run
cargo miri test || true
cargo miri test
# Check answers are correct when hash values collide
hash-collisions:
Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ exclude = ["python"]
[profile.release]
lto = true
codegen-units = 1

[patch.crates-io]
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", rev = "ef7937dfe56033c2cc491482c67587b52cd91554" }
#arrow2 = { git = "https://github.com/blaze-init/arrow2.git", branch = "shuffle_ipc" }
#parquet2 = { git = "https://github.com/blaze-init/parquet2.git", branch = "meta_new" }
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ Run a SQL query against data stored in a CSV:

```rust
use datafusion::prelude::*;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::arrow::record_batch::RecordBatch;

#[tokio::main]
Expand All @@ -93,7 +92,6 @@ Use the DataFrame API to process data stored in a CSV:

```rust
use datafusion::prelude::*;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::arrow::record_batch::RecordBatch;

#[tokio::main]
Expand Down
4 changes: 2 additions & 2 deletions ballista-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ rust-version = "1.57"
[dependencies]
datafusion = { path = "../datafusion" }
ballista = { path = "../ballista/rust/client", version = "0.6.0"}
prost = "0.8"
tonic = "0.5"
prost = "0.9"
tonic = "0.6"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
futures = "0.3"
num_cpus = "1.13.0"
2 changes: 1 addition & 1 deletion ballista-examples/src/bin/ballista-dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> Result<()> {
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);

let testdata = datafusion::arrow::util::test_util::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();

let filename = &format!("{}/alltypes_plain.parquet", testdata);

Expand Down
2 changes: 1 addition & 1 deletion ballista-examples/src/bin/ballista-sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> Result<()> {
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);

let testdata = datafusion::arrow::util::test_util::arrow_test_data();
let testdata = datafusion::test_util::arrow_test_data();

// register csv file with the execution context
ctx.register_csv(
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ data set.

```rust,no_run
use ballista::prelude::*;
use datafusion::arrow::util::pretty;
use datafusion::arrow::io::print;
use datafusion::prelude::CsvReadOptions;
#[tokio::main]
Expand Down Expand Up @@ -125,7 +125,7 @@ async fn main() -> Result<()> {
// collect the results and print them to stdout
let results = df.collect().await?;
pretty::print_batches(&results)?;
print::print(&results);
Ok(())
}
```
Expand Down
5 changes: 3 additions & 2 deletions ballista/rust/client/src/columnar_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use ballista_core::error::{ballista_error, Result};

use datafusion::arrow::{
array::ArrayRef,
compute::aggregate::estimated_bytes_size,
datatypes::{DataType, Schema},
record_batch::RecordBatch,
};
Expand Down Expand Up @@ -50,7 +51,7 @@ impl ColumnarBatch {
.collect();

Self {
schema: batch.schema(),
schema: batch.schema().clone(),
columns,
}
}
Expand Down Expand Up @@ -156,7 +157,7 @@ impl ColumnarValue {

pub fn memory_size(&self) -> usize {
match self {
ColumnarValue::Columnar(array) => array.get_array_memory_size(),
ColumnarValue::Columnar(array) => estimated_bytes_size(array.as_ref()),
_ => 0,
}
}
Expand Down
12 changes: 5 additions & 7 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,21 @@ async-trait = "0.1.36"
futures = "0.3"
hashbrown = "0.11"
log = "0.4"
prost = "0.8"
prost = "0.9"
serde = {version = "1", features = ["derive"]}
sqlparser = "0.13"
tokio = "1.0"
tonic = "0.5"
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
chrono = { version = "0.4", default-features = false }

# workaround for https://github.com/apache/arrow-datafusion/issues/1498
# should be able to remove when we update arrow-flight
quote = "=1.0.10"
arrow-flight = { version = "6.4.0" }
arrow-format = { version = "0.3", features = ["flight-data", "flight-service"] }
arrow = { package = "arrow2", version="0.8", features = ["io_ipc", "io_flight"] }

datafusion = { path = "../../../datafusion", version = "6.0.0" }

[dev-dependencies]
tempfile = "3"

[build-dependencies]
tonic-build = { version = "0.5" }
tonic-build = { version = "0.6" }
24 changes: 21 additions & 3 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,7 @@ enum TimeUnit{
enum IntervalUnit{
YearMonth = 0;
DayTime = 1;
MonthDayNano = 2;
}

message Decimal{
Expand All @@ -1028,11 +1029,11 @@ message List{

message FixedSizeList{
Field field_type = 1;
int32 list_size = 2;
uint32 list_size = 2;
}

message Dictionary{
ArrowType key = 1;
IntegerType key = 1;
ArrowType value = 2;
}

Expand Down Expand Up @@ -1135,7 +1136,7 @@ message ArrowType{
EmptyMessage UTF8 =14 ;
EmptyMessage LARGE_UTF8 = 32;
EmptyMessage BINARY =15 ;
int32 FIXED_SIZE_BINARY =16 ;
uint32 FIXED_SIZE_BINARY =16 ;
EmptyMessage LARGE_BINARY = 31;
EmptyMessage DATE32 =17 ;
EmptyMessage DATE64 =18 ;
Expand All @@ -1154,6 +1155,23 @@ message ArrowType{
}
}

// Broke out into multiple message types so that type
// metadata did not need to be in separate message
//All types that are of the empty message types contain no additional metadata
// about the type
message IntegerType{
oneof integer_type_enum{
EmptyMessage INT8 = 1;
EmptyMessage INT16 = 2;
EmptyMessage INT32 = 3;
EmptyMessage INT64 = 4;
EmptyMessage UINT8 = 5;
EmptyMessage UINT16 = 6;
EmptyMessage UINT32 = 7;
EmptyMessage UINT64 = 8;
}
}




Expand Down
44 changes: 30 additions & 14 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

//! Client API for sending requests to executors.

use std::sync::Arc;
use arrow::io::flight::deserialize_schemas;
use arrow::io::ipc::IpcSchema;
use std::sync::{Arc, Mutex};
use std::{collections::HashMap, pin::Pin};
use std::{
convert::{TryFrom, TryInto},
Expand All @@ -31,11 +33,10 @@ use crate::serde::scheduler::{
Action, ExecutePartition, ExecutePartitionResult, PartitionId, PartitionStats,
};

use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::Ticket;
use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
use arrow_format::flight::data::{FlightData, Ticket};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use datafusion::arrow::{
array::{StringArray, StructArray},
array::{StructArray, Utf8Array},
datatypes::{Schema, SchemaRef},
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
Expand Down Expand Up @@ -122,10 +123,12 @@ impl BallistaClient {
{
Some(flight_data) => {
// convert FlightData to a stream
let schema = Arc::new(Schema::try_from(&flight_data)?);
let (schema, ipc_schema) =
deserialize_schemas(flight_data.data_body.as_slice()).unwrap();
let schema = Arc::new(schema);

// all the remaining stream messages should be dictionary and record batches
Ok(Box::pin(FlightDataStream::new(stream, schema)))
Ok(Box::pin(FlightDataStream::new(stream, schema, ipc_schema)))
}
None => Err(ballista_error(
"Did not receive schema batch from flight server",
Expand All @@ -135,32 +138,45 @@ impl BallistaClient {
}

struct FlightDataStream {
stream: Streaming<FlightData>,
stream: Mutex<Streaming<FlightData>>,
schema: SchemaRef,
ipc_schema: IpcSchema,
}

impl FlightDataStream {
pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
Self { stream, schema }
pub fn new(
stream: Streaming<FlightData>,
schema: SchemaRef,
ipc_schema: IpcSchema,
) -> Self {
Self {
stream: Mutex::new(stream),
schema,
ipc_schema,
}
}
}

impl Stream for FlightDataStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx).map(|x| match x {
let mut stream = self.stream.lock().unwrap();
stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
.map_err(|e| ArrowError::from_external_error(Box::new(e)))
.and_then(|flight_data_chunk| {
flight_data_to_arrow_batch(
let hm = HashMap::new();

arrow::io::flight::deserialize_batch(
&flight_data_chunk,
self.schema.clone(),
&[],
&self.ipc_schema,
&hm,
)
});
Some(converted_chunk)
Expand Down
Loading

0 comments on commit c0c9c72

Please sign in to comment.