Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into hash_join_batch_size
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa committed Nov 4, 2023
2 parents 7fbe918 + 2af326a commit 322db73
Show file tree
Hide file tree
Showing 65 changed files with 4,787 additions and 4,193 deletions.
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ If there are user-facing changes then we may require documentation to be updated

<!--
If there are any breaking changes to public APIs, please add the `api change` label.
-->
-->
2 changes: 1 addition & 1 deletion datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ impl From<DataFusionError> for io::Error {
}

impl DataFusionError {
const BACK_TRACE_SEP: &str = "\n\nbacktrace: ";
const BACK_TRACE_SEP: &'static str = "\n\nbacktrace: ";

/// Get deepest underlying [`DataFusionError`]
///
Expand Down
20 changes: 10 additions & 10 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::cast::{
};
use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err};
use crate::hash_utils::create_hashes;
use crate::utils::wrap_into_list_array;
use crate::utils::array_into_list_array;
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::compute::kernels::numeric::*;
use arrow::datatypes::{i256, FieldRef, Fields, SchemaBuilder};
Expand Down Expand Up @@ -1667,7 +1667,7 @@ impl ScalarValue {
} else {
Self::iter_to_array(values.iter().cloned()).unwrap()
};
Arc::new(wrap_into_list_array(values))
Arc::new(array_into_list_array(values))
}

/// Converts a scalar value into an array of `size` rows.
Expand Down Expand Up @@ -2058,7 +2058,7 @@ impl ScalarValue {
let list_array = as_list_array(array);
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at `index`.
let arr = Arc::new(wrap_into_list_array(nested_array));
let arr = Arc::new(array_into_list_array(nested_array));

ScalarValue::List(arr)
}
Expand All @@ -2067,7 +2067,7 @@ impl ScalarValue {
let list_array = as_fixed_size_list_array(array)?;
let nested_array = list_array.value(index);
// Produces a single element `ListArray` with the value at `index`.
let arr = Arc::new(wrap_into_list_array(nested_array));
let arr = Arc::new(array_into_list_array(nested_array));

ScalarValue::List(arr)
}
Expand Down Expand Up @@ -3052,7 +3052,7 @@ mod tests {

let array = ScalarValue::new_list(scalars.as_slice(), &DataType::Utf8);

let expected = wrap_into_list_array(Arc::new(StringArray::from(vec![
let expected = array_into_list_array(Arc::new(StringArray::from(vec![
"rust",
"arrow",
"data-fusion",
Expand Down Expand Up @@ -3091,9 +3091,9 @@ mod tests {
#[test]
fn iter_to_array_string_test() {
let arr1 =
wrap_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"])));
array_into_list_array(Arc::new(StringArray::from(vec!["foo", "bar", "baz"])));
let arr2 =
wrap_into_list_array(Arc::new(StringArray::from(vec!["rust", "world"])));
array_into_list_array(Arc::new(StringArray::from(vec!["rust", "world"])));

let scalars = vec![
ScalarValue::List(Arc::new(arr1)),
Expand Down Expand Up @@ -4335,13 +4335,13 @@ mod tests {
// Define list-of-structs scalars

let nl0_array = ScalarValue::iter_to_array(vec![s0.clone(), s1.clone()]).unwrap();
let nl0 = ScalarValue::List(Arc::new(wrap_into_list_array(nl0_array)));
let nl0 = ScalarValue::List(Arc::new(array_into_list_array(nl0_array)));

let nl1_array = ScalarValue::iter_to_array(vec![s2.clone()]).unwrap();
let nl1 = ScalarValue::List(Arc::new(wrap_into_list_array(nl1_array)));
let nl1 = ScalarValue::List(Arc::new(array_into_list_array(nl1_array)));

let nl2_array = ScalarValue::iter_to_array(vec![s1.clone()]).unwrap();
let nl2 = ScalarValue::List(Arc::new(wrap_into_list_array(nl2_array)));
let nl2 = ScalarValue::List(Arc::new(array_into_list_array(nl2_array)));

// iter_to_array for list-of-struct
let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
Expand Down
46 changes: 44 additions & 2 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

//! This module provides the bisect function, which implements binary search.

use crate::error::_internal_err;
use crate::{DataFusionError, Result, ScalarValue};
use arrow::array::{ArrayRef, PrimitiveArray};
use arrow::buffer::OffsetBuffer;
use arrow::compute;
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
use arrow_array::ListArray;
use arrow_array::{Array, ListArray};
use sqlparser::ast::Ident;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
Expand Down Expand Up @@ -338,7 +339,7 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(

/// Wrap an array into a single element `ListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
pub fn wrap_into_list_array(arr: ArrayRef) -> ListArray {
pub fn array_into_list_array(arr: ArrayRef) -> ListArray {
let offsets = OffsetBuffer::from_lengths([arr.len()]);
ListArray::new(
Arc::new(Field::new("item", arr.data_type().to_owned(), true)),
Expand All @@ -348,6 +349,47 @@ pub fn wrap_into_list_array(arr: ArrayRef) -> ListArray {
)
}

/// Wrap arrays into a single element `ListArray`.
///
/// Example:
/// ```
/// use arrow::array::{Int32Array, ListArray, ArrayRef};
/// use arrow::datatypes::{Int32Type, Field};
/// use std::sync::Arc;
///
/// let arr1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
/// let arr2 = Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef;
///
/// let list_arr = datafusion_common::utils::arrays_into_list_array([arr1, arr2]).unwrap();
///
/// let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(
/// vec![
/// Some(vec![Some(1), Some(2), Some(3)]),
/// Some(vec![Some(4), Some(5), Some(6)]),
/// ]
/// );
///
/// assert_eq!(list_arr, expected);
pub fn arrays_into_list_array(
arr: impl IntoIterator<Item = ArrayRef>,
) -> Result<ListArray> {
let arr = arr.into_iter().collect::<Vec<_>>();
if arr.is_empty() {
return _internal_err!("Cannot wrap empty array into list array");
}

let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
// Assume data type is consistent
let data_type = arr[0].data_type().to_owned();
let values = arr.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
Ok(ListArray::new(
Arc::new(Field::new("item", data_type, true)),
OffsetBuffer::from_lengths(lens),
arrow::compute::concat(values.as_slice())?,
None,
))
}

/// An extension trait for smart pointers. Provides an interface to get a
/// raw pointer to the data (with metadata stripped away).
///
Expand Down
197 changes: 186 additions & 11 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)

use std::any::Any;
use std::io::{Read, Seek};
use std::borrow::Cow;
use std::sync::Arc;

use crate::datasource::file_format::FileFormat;
Expand All @@ -29,13 +29,18 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::ExecutionPlan;

use arrow::ipc::convert::fb_to_schema;
use arrow::ipc::reader::FileReader;
use arrow_schema::{Schema, SchemaRef};
use arrow::ipc::root_as_message;
use arrow_schema::{ArrowError, Schema, SchemaRef};

use bytes::Bytes;
use datafusion_common::{FileType, Statistics};
use datafusion_physical_expr::PhysicalExpr;

use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

/// Arrow `FileFormat` implementation.
Expand All @@ -59,13 +64,11 @@ impl FileFormat for ArrowFormat {
let r = store.as_ref().get(&object.location).await?;
let schema = match r.payload {
GetResultPayload::File(mut file, _) => {
read_arrow_schema_from_reader(&mut file)?
let reader = FileReader::try_new(&mut file, None)?;
reader.schema()
}
GetResultPayload::Stream(_) => {
// TODO: Fetching entire file to get schema is potentially wasteful
let data = r.bytes().await?;
let mut cursor = std::io::Cursor::new(&data);
read_arrow_schema_from_reader(&mut cursor)?
GetResultPayload::Stream(stream) => {
infer_schema_from_file_stream(stream).await?
}
};
schemas.push(schema.as_ref().clone());
Expand Down Expand Up @@ -99,7 +102,179 @@ impl FileFormat for ArrowFormat {
}
}

fn read_arrow_schema_from_reader<R: Read + Seek>(reader: R) -> Result<SchemaRef> {
let reader = FileReader::try_new(reader, None)?;
Ok(reader.schema())
const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];

/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs.
/// See <https://github.com/apache/arrow-rs/issues/5021>
async fn infer_schema_from_file_stream(
mut stream: BoxStream<'static, object_store::Result<Bytes>>,
) -> Result<SchemaRef> {
// Expected format:
// <magic number "ARROW1"> - 6 bytes
// <empty padding bytes [to 8 byte boundary]> - 2 bytes
// <continutation: 0xFFFFFFFF> - 4 bytes, not present below v0.15.0
// <metadata_size: int32> - 4 bytes
// <metadata_flatbuffer: bytes>
// <rest of file bytes>

// So in first read we need at least all known sized sections,
// which is 6 + 2 + 4 + 4 = 16 bytes.
let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?;

// Files should start with these magic bytes
if bytes[0..6] != ARROW_MAGIC {
return Err(ArrowError::ParseError(
"Arrow file does not contian correct header".to_string(),
))?;
}

// Since continuation marker bytes added in later versions
let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER {
(&bytes[12..16], 16)
} else {
(&bytes[8..12], 12)
};

let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]];
let meta_len = i32::from_le_bytes(meta_len);

// Read bytes for Schema message
let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as usize {
// Need to read more bytes to decode Message
let mut block_data = Vec::with_capacity(meta_len as usize);
// In case we had some spare bytes in our initial read chunk
block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]);
let size_to_read = meta_len as usize - block_data.len();
let block_data =
collect_at_least_n_bytes(&mut stream, size_to_read, Some(block_data)).await?;
Cow::Owned(block_data)
} else {
// Already have the bytes we need
let end_index = meta_len as usize + rest_of_bytes_start_index;
let block_data = &bytes[rest_of_bytes_start_index..end_index];
Cow::Borrowed(block_data)
};

// Decode Schema message
let message = root_as_message(&block_data).map_err(|err| {
ArrowError::ParseError(format!("Unable to read IPC message as metadata: {err:?}"))
})?;
let ipc_schema = message.header_as_schema().ok_or_else(|| {
ArrowError::IpcError("Unable to read IPC message as schema".to_string())
})?;
let schema = fb_to_schema(ipc_schema);

Ok(Arc::new(schema))
}

async fn collect_at_least_n_bytes(
stream: &mut BoxStream<'static, object_store::Result<Bytes>>,
n: usize,
extend_from: Option<Vec<u8>>,
) -> Result<Vec<u8>> {
let mut buf = extend_from.unwrap_or_else(|| Vec::with_capacity(n));
// If extending existing buffer then ensure we read n additional bytes
let n = n + buf.len();
while let Some(bytes) = stream.next().await.transpose()? {
buf.extend_from_slice(&bytes);
if buf.len() >= n {
break;
}
}
if buf.len() < n {
return Err(ArrowError::ParseError(
"Unexpected end of byte stream for Arrow IPC file".to_string(),
))?;
}
Ok(buf)
}

#[cfg(test)]
mod tests {
use chrono::DateTime;
use object_store::{chunked::ChunkedStore, memory::InMemory, path::Path};

use crate::execution::context::SessionContext;

use super::*;

#[tokio::test]
async fn test_infer_schema_stream() -> Result<()> {
let mut bytes = std::fs::read("tests/data/example.arrow")?;
bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file
let location = Path::parse("example.arrow")?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let session_ctx = SessionContext::new();
let state = session_ctx.state();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: usize::MAX,
e_tag: None,
};

let arrow_format = ArrowFormat {};
let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"];

// Test chunk sizes where too small so we keep having to read more bytes
// And when large enough that first read contains all we need
for chunk_size in [7, 3000] {
let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size));
let inferred_schema = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
&[object_meta.clone()],
)
.await?;
let actual_fields = inferred_schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect::<Vec<_>>();
assert_eq!(expected, actual_fields);
}

Ok(())
}

#[tokio::test]
async fn test_infer_schema_short_stream() -> Result<()> {
let mut bytes = std::fs::read("tests/data/example.arrow")?;
bytes.truncate(20); // should cause error that file shorter than expected
let location = Path::parse("example.arrow")?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let session_ctx = SessionContext::new();
let state = session_ctx.state();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: usize::MAX,
e_tag: None,
};

let arrow_format = ArrowFormat {};

let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7));
let err = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
&[object_meta.clone()],
)
.await;

assert!(err.is_err());
assert_eq!(
"Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file",
err.unwrap_err().to_string()
);

Ok(())
}
}
Loading

0 comments on commit 322db73

Please sign in to comment.