diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index c3f91dd9b1d1..7e871fa94f0e 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -78,6 +78,8 @@ pub enum ScalarValue { LargeUtf8(Option), /// binary Binary(Option>), + /// fixed size binary + FixedSizeBinary(i32, Option>), /// large binary LargeBinary(Option>), /// list of nested ScalarValue @@ -160,6 +162,8 @@ impl PartialEq for ScalarValue { (LargeUtf8(_), _) => false, (Binary(v1), Binary(v2)) => v1.eq(v2), (Binary(_), _) => false, + (FixedSizeBinary(_, v1), FixedSizeBinary(_, v2)) => v1.eq(v2), + (FixedSizeBinary(_, _), _) => false, (LargeBinary(v1), LargeBinary(v2)) => v1.eq(v2), (LargeBinary(_), _) => false, (List(v1, t1), List(v2, t2)) => v1.eq(v2) && t1.eq(t2), @@ -248,6 +252,8 @@ impl PartialOrd for ScalarValue { (LargeUtf8(_), _) => None, (Binary(v1), Binary(v2)) => v1.partial_cmp(v2), (Binary(_), _) => None, + (FixedSizeBinary(_, v1), FixedSizeBinary(_, v2)) => v1.partial_cmp(v2), + (FixedSizeBinary(_, _), _) => None, (LargeBinary(v1), LargeBinary(v2)) => v1.partial_cmp(v2), (LargeBinary(_), _) => None, (List(v1, t1), List(v2, t2)) => { @@ -537,6 +543,7 @@ impl std::hash::Hash for ScalarValue { Utf8(v) => v.hash(state), LargeUtf8(v) => v.hash(state), Binary(v) => v.hash(state), + FixedSizeBinary(_, v) => v.hash(state), LargeBinary(v) => v.hash(state), List(v, t) => { v.hash(state); @@ -901,6 +908,7 @@ impl ScalarValue { ScalarValue::Utf8(_) => DataType::Utf8, ScalarValue::LargeUtf8(_) => DataType::LargeUtf8, ScalarValue::Binary(_) => DataType::Binary, + ScalarValue::FixedSizeBinary(sz, _) => DataType::FixedSizeBinary(*sz), ScalarValue::LargeBinary(_) => DataType::LargeBinary, ScalarValue::List(_, field) => DataType::List(Box::new(Field::new( "item", @@ -988,6 +996,7 @@ impl ScalarValue { ScalarValue::Utf8(v) => v.is_none(), ScalarValue::LargeUtf8(v) => v.is_none(), ScalarValue::Binary(v) => v.is_none(), + ScalarValue::FixedSizeBinary(_, v) => v.is_none(), ScalarValue::LargeBinary(v) => v.is_none(), ScalarValue::List(v, _) => v.is_none(), ScalarValue::Date32(v) => v.is_none(), @@ -1357,13 +1366,30 @@ impl ScalarValue { _ => unreachable!("Invalid dictionary keys type: {:?}", key_type), } } + DataType::FixedSizeBinary(_) => { + let array = scalars + .map(|sv| { + if let ScalarValue::FixedSizeBinary(_, v) = sv { + Ok(v) + } else { + Err(DataFusionError::Internal(format!( + "Inconsistent types in ScalarValue::iter_to_array. \ + Expected {:?}, got {:?}", + data_type, sv + ))) + } + }) + .collect::>>()?; + let array = + FixedSizeBinaryArray::try_from_sparse_iter(array.into_iter())?; + Arc::new(array) + } // explicitly enumerate unsupported types so newly added // types must be aknowledged DataType::Float16 | DataType::Time32(_) | DataType::Time64(_) | DataType::Duration(_) - | DataType::FixedSizeBinary(_) | DataType::FixedSizeList(_, _) | DataType::Interval(_) | DataType::LargeList(_) @@ -1566,6 +1592,20 @@ impl ScalarValue { Arc::new(repeat(None::<&str>).take(size).collect::()) } }, + ScalarValue::FixedSizeBinary(_, e) => match e { + Some(value) => Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter( + repeat(Some(value.as_slice())).take(size), + ) + .unwrap(), + ), + None => Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter( + repeat(None::<&[u8]>).take(size), + ) + .unwrap(), + ), + }, ScalarValue::LargeBinary(e) => match e { Some(value) => Arc::new( repeat(Some(value.as_slice())) @@ -1851,6 +1891,23 @@ impl ScalarValue { }; ScalarValue::new_list(value, nested_type.data_type().clone()) } + DataType::FixedSizeBinary(_) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + let size = match array.data_type() { + DataType::FixedSizeBinary(size) => *size, + _ => unreachable!(), + }; + ScalarValue::FixedSizeBinary( + size, + match array.is_null(index) { + true => None, + false => Some(array.value(index).into()), + }, + ) + } other => { return Err(DataFusionError::NotImplemented(format!( "Can't create a scalar from array of type \"{:?}\"", @@ -1941,6 +1998,9 @@ impl ScalarValue { ScalarValue::Binary(val) => { eq_array_primitive!(array, index, BinaryArray, val) } + ScalarValue::FixedSizeBinary(_, val) => { + eq_array_primitive!(array, index, FixedSizeBinaryArray, val) + } ScalarValue::LargeBinary(val) => { eq_array_primitive!(array, index, LargeBinaryArray, val) } @@ -2285,6 +2345,17 @@ impl fmt::Display for ScalarValue { )?, None => write!(f, "NULL")?, }, + ScalarValue::FixedSizeBinary(_, e) => match e { + Some(l) => write!( + f, + "{}", + l.iter() + .map(|v| format!("{}", v)) + .collect::>() + .join(",") + )?, + None => write!(f, "NULL")?, + }, ScalarValue::LargeBinary(e) => match e { Some(l) => write!( f, @@ -2365,6 +2436,12 @@ impl fmt::Debug for ScalarValue { ScalarValue::LargeUtf8(Some(_)) => write!(f, "LargeUtf8(\"{}\")", self), ScalarValue::Binary(None) => write!(f, "Binary({})", self), ScalarValue::Binary(Some(_)) => write!(f, "Binary(\"{}\")", self), + ScalarValue::FixedSizeBinary(size, None) => { + write!(f, "FixedSizeBinary({}, {})", size, self) + } + ScalarValue::FixedSizeBinary(size, Some(_)) => { + write!(f, "FixedSizeBinary({}, \"{}\")", size, self) + } ScalarValue::LargeBinary(None) => write!(f, "LargeBinary({})", self), ScalarValue::LargeBinary(Some(_)) => write!(f, "LargeBinary(\"{}\")", self), ScalarValue::List(_, _) => write!(f, "List([{}])", self), diff --git a/datafusion/core/tests/parquet/test_binary.parquet b/datafusion/core/tests/parquet/test_binary.parquet new file mode 100644 index 000000000000..9d906bc490d0 Binary files /dev/null and b/datafusion/core/tests/parquet/test_binary.parquet differ diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index 8bec4f1dd3db..c70466737f40 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -48,6 +48,28 @@ async fn parquet_query() { assert_batches_eq!(expected, &actual); } +#[tokio::test] +async fn fixed_size_binary_columns() { + let ctx = SessionContext::new(); + ctx.register_parquet( + "t0", + "tests/parquet/test_binary.parquet", + ParquetReadOptions::default(), + ) + .await + .unwrap(); + let sql = "SELECT ids FROM t0 ORDER BY ids"; + let plan = ctx.create_logical_plan(sql).unwrap(); + let plan = ctx.optimize(&plan).unwrap(); + let plan = ctx.create_physical_plan(&plan).await.unwrap(); + let task_ctx = ctx.task_ctx(); + let results = collect(plan, task_ctx).await.unwrap(); + for batch in results { + assert_eq!(466, batch.num_rows()); + assert_eq!(1, batch.num_columns()); + } +} + #[tokio::test] async fn parquet_single_nan_schema() { let ctx = SessionContext::new(); diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index 47b779fffc74..5e47bbfe9f83 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -1187,6 +1187,11 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { Value::LargeBinaryValue(s.to_owned()) }) } + scalar::ScalarValue::FixedSizeBinary(_, _) => { + return Err(Error::General( + "FixedSizeBinary is not yet implemented".to_owned(), + )) + } datafusion::scalar::ScalarValue::Time64(v) => { create_proto_scalar(v, PrimitiveScalarType::Time64, |v| {