Skip to content

Commit

Permalink
add from_slice trait to ease arrow2 migration
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Jan 16, 2022
1 parent 6f7b2d2 commit cbfa8ec
Show file tree
Hide file tree
Showing 38 changed files with 353 additions and 253 deletions.
13 changes: 7 additions & 6 deletions datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ mod tests {
use super::*;
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::from_slice::FromSlice;
use std::sync::Arc;

#[test]
Expand Down Expand Up @@ -168,9 +169,9 @@ mod tests {
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
],
)
.unwrap();
Expand Down Expand Up @@ -198,9 +199,9 @@ mod tests {
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
],
)
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/dataframe_in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use std::sync::Arc;
use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;

use datafusion::datasource::MemTable;
use datafusion::error::Result;
use datafusion::from_slice::FromSlice;
use datafusion::prelude::*;

/// This example demonstrates how to use the DataFrame API against in-memory data.
Expand All @@ -39,7 +39,7 @@ async fn main() -> Result<()> {
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])),
],
)?;

Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/simple_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion::arrow::{
record_batch::RecordBatch,
};

use datafusion::from_slice::FromSlice;
use datafusion::physical_plan::functions::Volatility;
use datafusion::{error::Result, logical_plan::create_udaf, physical_plan::Accumulator};
use datafusion::{prelude::*, scalar::ScalarValue};
Expand All @@ -37,11 +38,11 @@ fn create_context() -> Result<ExecutionContext> {
// define data in two partitions
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
vec![Arc::new(Float32Array::from_slice(&[2.0, 4.0, 8.0]))],
)?;
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Float32Array::from(vec![64.0]))],
vec![Arc::new(Float32Array::from_slice(&[64.0]))],
)?;

// declare a new context. In spark API, this corresponds to a new spark SQLsession
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datafusion::{
physical_plan::functions::Volatility,
};

use datafusion::from_slice::FromSlice;
use datafusion::prelude::*;
use datafusion::{error::Result, physical_plan::functions::make_scalar_function};
use std::sync::Arc;
Expand All @@ -42,8 +43,8 @@ fn create_context() -> Result<ExecutionContext> {
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1])),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
Arc::new(Float32Array::from_slice(&[2.1, 3.1, 4.1, 5.1])),
Arc::new(Float64Array::from_slice(&[1.0, 2.0, 3.0, 4.0])),
],
)?;

Expand Down
41 changes: 21 additions & 20 deletions datafusion/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl TableProvider for MemTable {
#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use futures::StreamExt;
Expand All @@ -164,9 +165,9 @@ mod tests {
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
Arc::new(Int32Array::from(vec![None, None, Some(9)])),
],
)?;
Expand Down Expand Up @@ -197,9 +198,9 @@ mod tests {
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
],
)?;

Expand All @@ -225,9 +226,9 @@ mod tests {
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
],
)?;

Expand Down Expand Up @@ -262,9 +263,9 @@ mod tests {
let batch = RecordBatch::try_new(
schema1,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
],
)?;

Expand Down Expand Up @@ -295,8 +296,8 @@ mod tests {
let batch = RecordBatch::try_new(
schema1,
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![7, 5, 9])),
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[7, 5, 9])),
],
)?;

Expand Down Expand Up @@ -339,18 +340,18 @@ mod tests {
let batch1 = RecordBatch::try_new(
Arc::new(schema1),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
],
)?;

let batch2 = RecordBatch::try_new(
Arc::new(schema2),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
],
)?;

Expand Down
35 changes: 18 additions & 17 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,7 @@ impl FunctionRegistry for ExecutionContextState {
#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use crate::logical_plan::plan::Projection;
use crate::logical_plan::TableScan;
use crate::logical_plan::{binary_expr, lit, Operator};
Expand Down Expand Up @@ -1514,9 +1515,9 @@ mod tests {
let partitions = vec![vec![RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
Arc::new(Int32Array::from(vec![2, 12, 12, 120])),
Arc::new(Int32Array::from(vec![3, 12, 12, 120])),
Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])),
Arc::new(Int32Array::from_slice(&[2, 12, 12, 120])),
Arc::new(Int32Array::from_slice(&[3, 12, 12, 120])),
],
)?]];

Expand Down Expand Up @@ -2936,11 +2937,11 @@ mod tests {
),
(
DataType::Int32,
Arc::new(Int32Array::from(vec![1])) as ArrayRef,
Arc::new(Int32Array::from_slice(&[1])) as ArrayRef,
),
(
DataType::Int64,
Arc::new(Int64Array::from(vec![1])) as ArrayRef,
Arc::new(Int64Array::from_slice(&[1])) as ArrayRef,
),
(
DataType::UInt8,
Expand All @@ -2952,19 +2953,19 @@ mod tests {
),
(
DataType::UInt32,
Arc::new(UInt32Array::from(vec![1])) as ArrayRef,
Arc::new(UInt32Array::from_slice(&[1])) as ArrayRef,
),
(
DataType::UInt64,
Arc::new(UInt64Array::from(vec![1])) as ArrayRef,
Arc::new(UInt64Array::from_slice(&[1])) as ArrayRef,
),
(
DataType::Float32,
Arc::new(Float32Array::from(vec![1.0_f32])) as ArrayRef,
Arc::new(Float32Array::from_slice(&[1.0_f32])) as ArrayRef,
),
(
DataType::Float64,
Arc::new(Float64Array::from(vec![1.0_f64])) as ArrayRef,
Arc::new(Float64Array::from_slice(&[1.0_f64])) as ArrayRef,
),
];

Expand Down Expand Up @@ -3278,8 +3279,8 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
Arc::new(Int32Array::from(vec![2, 12, 12, 120])),
Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])),
Arc::new(Int32Array::from_slice(&[2, 12, 12, 120])),
],
)?;

Expand Down Expand Up @@ -3379,11 +3380,11 @@ mod tests {

let batch1 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
vec![Arc::new(Int32Array::from_slice(&[1, 2, 3]))],
)?;
let batch2 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int32Array::from(vec![4, 5]))],
vec![Arc::new(Int32Array::from_slice(&[4, 5]))],
)?;

let mut ctx = ExecutionContext::new();
Expand Down Expand Up @@ -3416,11 +3417,11 @@ mod tests {

let batch1 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
vec![Arc::new(Int32Array::from_slice(&[1, 2, 3]))],
)?;
let batch2 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int32Array::from(vec![4, 5]))],
vec![Arc::new(Int32Array::from_slice(&[4, 5]))],
)?;

let mut ctx = ExecutionContext::new();
Expand Down Expand Up @@ -3880,8 +3881,8 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(Float64Array::from(vec![1.0])),
Arc::new(Int32Array::from_slice(&[1])),
Arc::new(Float64Array::from_slice(&[1.0])),
Arc::new(StringArray::from(vec![Some("foo")])),
Arc::new(LargeStringArray::from(vec![Some("bar")])),
Arc::new(BinaryArray::from(vec![b"foo" as &[u8]])),
Expand Down
45 changes: 45 additions & 0 deletions datafusion/src/from_slice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! A trait to define from_slice functions for arrow types
//!
//! This file essentially exists to ease the transition onto arrow2
use arrow::array::{ArrayData, PrimitiveArray};
use arrow::buffer::Buffer;
use arrow::datatypes::ArrowPrimitiveType;

/// A trait to define from_slice functions for arrow primitive array types
pub trait FromSlice<T>
where
T: ArrowPrimitiveType,
{
/// convert a slice of native types into a primitive array (without nulls)
fn from_slice(slice: &[T::Native]) -> PrimitiveArray<T>;
}

/// default implementation for primitive types
// #[cfg(test)]
impl<T: ArrowPrimitiveType> FromSlice<T> for PrimitiveArray<T> {
fn from_slice(slice: &[T::Native]) -> PrimitiveArray<T> {
let array_data = ArrayData::builder(T::DATA_TYPE)
.len(slice.len())
.add_buffer(Buffer::from_slice_ref(&slice));
let array_data = unsafe { array_data.build_unchecked() };
PrimitiveArray::<T>::from(array_data)
}
}
2 changes: 2 additions & 0 deletions datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ pub(crate) mod field_util;
#[cfg(feature = "pyarrow")]
mod pyarrow;

pub mod from_slice;

#[cfg(test)]
pub mod test;
pub mod test_util;
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec};
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema};
Expand Down Expand Up @@ -336,7 +337,7 @@ mod tests {
fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]))],
vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
)
.unwrap()
}
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ impl<T> Drop for AbortOnDropMany<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use arrow::{
array::{Float32Array, Float64Array},
datatypes::{DataType, Field, Schema},
Expand Down Expand Up @@ -343,8 +344,8 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Float32Array::from(vec![1., 2., 3.])),
Arc::new(Float64Array::from(vec![9., 8., 7.])),
Arc::new(Float32Array::from_slice(&[1., 2., 3.])),
Arc::new(Float64Array::from_slice(&[9., 8., 7.])),
],
)?;
let result =
Expand Down
Loading

0 comments on commit cbfa8ec

Please sign in to comment.