Skip to content

Commit

Permalink
ARROW-7842: [Rust] [Parquet] Arrow list reader
Browse files Browse the repository at this point in the history
This is a port of apache#6770 to the parquet-writer branch.

We'll have more of a chance to test this reader,and ensure that we can roundtrip on list types.

Closes apache#8449 from nevi-me/ARROW-7842-cherry

Authored-by: Neville Dipale <nevilledips@gmail.com>
Signed-off-by: Neville Dipale <nevilledips@gmail.com>
  • Loading branch information
nevi-me authored and GeorgeAp committed Jun 7, 2021
1 parent 84402e5 commit fe5fcff
Show file tree
Hide file tree
Showing 5 changed files with 755 additions and 30 deletions.
17 changes: 17 additions & 0 deletions rust/arrow/src/util/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ macro_rules! make_string {
}};
}

macro_rules! make_string_from_list {
($column: ident, $row: ident) => {{
let list = $column
.as_any()
.downcast_ref::<array::ListArray>()
.ok_or(ArrowError::InvalidArgumentError(format!(
"Repl error: could not convert list column to list array."
)))?
.value($row);
let string_values = (0..list.len())
.map(|i| array_value_to_string(&list.clone(), i))
.collect::<Result<Vec<String>>>()?;
Ok(format!("[{}]", string_values.join(", ")))
}};
}

/// Get the value at the given row in an array as a String.
///
/// Note this function is quite inefficient and is unlikely to be
Expand Down Expand Up @@ -89,6 +105,7 @@ pub fn array_value_to_string(column: &array::ArrayRef, row: usize) -> Result<Str
DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => {
make_string!(array::Time64NanosecondArray, column, row)
}
DataType::List(_) => make_string_from_list!(column, row),
DataType::Dictionary(index_type, _value_type) => match **index_type {
DataType::Int8 => dict_array_value_to_string::<Int8Type>(column, row),
DataType::Int16 => dict_array_value_to_string::<Int16Type>(column, row),
Expand Down
97 changes: 96 additions & 1 deletion rust/datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use std::convert::TryFrom;
use std::env;
use std::sync::Arc;

extern crate arrow;
extern crate datafusion;

use arrow::{array::*, datatypes::TimeUnit};
use arrow::{datatypes::Int32Type, record_batch::RecordBatch};
use arrow::{datatypes::Int32Type, datatypes::Int64Type, record_batch::RecordBatch};
use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
util::display::array_value_to_string,
Expand Down Expand Up @@ -128,6 +129,100 @@ async fn parquet_single_nan_schema() {
}
}

#[tokio::test]
async fn parquet_list_columns() {
let mut ctx = ExecutionContext::new();
let testdata = env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined");
ctx.register_parquet(
"list_columns",
&format!("{}/list_columns.parquet", testdata),
)
.unwrap();

let schema = Arc::new(Schema::new(vec![
Field::new(
"int64_list",
DataType::List(Box::new(DataType::Int64)),
true,
),
Field::new("utf8_list", DataType::List(Box::new(DataType::Utf8)), true),
]));

let sql = "SELECT int64_list, utf8_list FROM list_columns";
let plan = ctx.create_logical_plan(&sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
let plan = ctx.create_physical_plan(&plan).unwrap();
let results = ctx.collect(plan).await.unwrap();

// int64_list utf8_list
// 0 [1, 2, 3] [abc, efg, hij]
// 1 [None, 1] None
// 2 [4] [efg, None, hij, xyz]

assert_eq!(1, results.len());
let batch = &results[0];
assert_eq!(3, batch.num_rows());
assert_eq!(2, batch.num_columns());
assert_eq!(schema, batch.schema());

let int_list_array = batch
.column(0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let utf8_list_array = batch
.column(1)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();

assert_eq!(
int_list_array
.value(0)
.as_any()
.downcast_ref::<PrimitiveArray<Int64Type>>()
.unwrap(),
&PrimitiveArray::<Int64Type>::from(vec![Some(1), Some(2), Some(3),])
);

assert_eq!(
utf8_list_array
.value(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap(),
&StringArray::try_from(vec![Some("abc"), Some("efg"), Some("hij"),]).unwrap()
);

assert_eq!(
int_list_array
.value(1)
.as_any()
.downcast_ref::<PrimitiveArray<Int64Type>>()
.unwrap(),
&PrimitiveArray::<Int64Type>::from(vec![None, Some(1),])
);

assert!(utf8_list_array.is_null(1));

assert_eq!(
int_list_array
.value(2)
.as_any()
.downcast_ref::<PrimitiveArray<Int64Type>>()
.unwrap(),
&PrimitiveArray::<Int64Type>::from(vec![Some(4),])
);

let result = utf8_list_array.value(2);
let result = result.as_any().downcast_ref::<StringArray>().unwrap();

assert_eq!(result.value(0), "efg");
assert!(result.is_null(1));
assert_eq!(result.value(2), "hij");
assert_eq!(result.value(3), "xyz");
}

#[tokio::test]
async fn csv_count_star() -> Result<()> {
let mut ctx = ExecutionContext::new();
Expand Down
Loading

0 comments on commit fe5fcff

Please sign in to comment.