From 9268a85bb59c6616135ce1dc0dd28d24aad29c00 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 25 Sep 2022 10:02:34 -0400 Subject: [PATCH 1/5] Simplify serialization by removing redundant PrimitiveScalarValue --- datafusion/proto/proto/datafusion.proto | 42 +--- datafusion/proto/src/from_proto.rs | 297 +----------------------- datafusion/proto/src/to_proto.rs | 72 +++--- 3 files changed, 44 insertions(+), 367 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index d61f52ee7bb2..160f4974770d 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -762,8 +762,9 @@ message StructValue { message ScalarValue{ oneof value { - // Null value of any type (type is encoded) - PrimitiveScalarType null_value = 19; + // was PrimitiveScalarType null_value = 19; + // Null value of any type + ArrowType null_value = 33; bool bool_value = 1; string utf8_value = 2; @@ -778,7 +779,7 @@ message ScalarValue{ uint64 uint64_value = 11; float float32_value = 12; double float64_value = 13; - //Literal Date32 value always has a unit of day + // Literal Date32 value always has a unit of day int32 date_32_value = 14; ScalarListValue list_value = 17; //WAS: ScalarType null_list_value = 18; @@ -803,41 +804,6 @@ message Decimal128{ int64 s = 3; } -// Contains all valid datafusion scalar type except for -// List -enum PrimitiveScalarType{ - - BOOL = 0; // arrow::Type::BOOL - UINT8 = 1; // arrow::Type::UINT8 - INT8 = 2; // arrow::Type::INT8 - UINT16 = 3; // represents arrow::Type fields in src/arrow/type.h - INT16 = 4; - UINT32 = 5; - INT32 = 6; - UINT64 = 7; - INT64 = 8; - FLOAT32 = 9; - FLOAT64 = 10; - UTF8 = 11; - LARGE_UTF8 = 12; - DATE32 = 13; - TIMESTAMP_MICROSECOND = 14; - TIMESTAMP_NANOSECOND = 15; - NULL = 16; - DECIMAL128 = 17; - DATE64 = 20; - TIMESTAMP_SECOND = 21; - TIMESTAMP_MILLISECOND = 22; - INTERVAL_YEARMONTH = 23; - INTERVAL_DAYTIME = 24; - INTERVAL_MONTHDAYNANO = 28; - - BINARY = 25; - LARGE_BINARY = 26; - - TIME64 = 27; -} - // Broke out into multiple message types so that type // metadata did not need to be in separate message diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 79b477b3e1ef..1890c2460372 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -202,66 +202,7 @@ impl From for WindowFrameUnits { } } -impl From for DataType { - fn from(scalar: protobuf::PrimitiveScalarType) -> Self { - match scalar { - protobuf::PrimitiveScalarType::Bool => DataType::Boolean, - protobuf::PrimitiveScalarType::Uint8 => DataType::UInt8, - protobuf::PrimitiveScalarType::Int8 => DataType::Int8, - protobuf::PrimitiveScalarType::Uint16 => DataType::UInt16, - protobuf::PrimitiveScalarType::Int16 => DataType::Int16, - protobuf::PrimitiveScalarType::Uint32 => DataType::UInt32, - protobuf::PrimitiveScalarType::Int32 => DataType::Int32, - protobuf::PrimitiveScalarType::Uint64 => DataType::UInt64, - protobuf::PrimitiveScalarType::Int64 => DataType::Int64, - protobuf::PrimitiveScalarType::Float32 => DataType::Float32, - protobuf::PrimitiveScalarType::Float64 => DataType::Float64, - protobuf::PrimitiveScalarType::Utf8 => DataType::Utf8, - protobuf::PrimitiveScalarType::LargeUtf8 => DataType::LargeUtf8, - protobuf::PrimitiveScalarType::Binary => DataType::Binary, - protobuf::PrimitiveScalarType::LargeBinary => DataType::LargeBinary, - protobuf::PrimitiveScalarType::Date32 => DataType::Date32, - protobuf::PrimitiveScalarType::Time64 => { - DataType::Time64(TimeUnit::Nanosecond) - } - protobuf::PrimitiveScalarType::TimestampMicrosecond => { - DataType::Timestamp(TimeUnit::Microsecond, None) - } - protobuf::PrimitiveScalarType::TimestampNanosecond => { - DataType::Timestamp(TimeUnit::Nanosecond, None) - } - protobuf::PrimitiveScalarType::Null => DataType::Null, - protobuf::PrimitiveScalarType::Decimal128 => DataType::Decimal128(0, 0), - protobuf::PrimitiveScalarType::Date64 => DataType::Date64, - protobuf::PrimitiveScalarType::TimestampSecond => { - DataType::Timestamp(TimeUnit::Second, None) - } - protobuf::PrimitiveScalarType::TimestampMillisecond => { - DataType::Timestamp(TimeUnit::Millisecond, None) - } - protobuf::PrimitiveScalarType::IntervalYearmonth => { - DataType::Interval(IntervalUnit::YearMonth) - } - protobuf::PrimitiveScalarType::IntervalDaytime => { - DataType::Interval(IntervalUnit::DayTime) - } - protobuf::PrimitiveScalarType::IntervalMonthdaynano => { - DataType::Interval(IntervalUnit::MonthDayNano) - } - } - } -} -impl TryFrom<&protobuf::ArrowType> for DataType { - type Error = Error; - - fn try_from(arrow_type: &protobuf::ArrowType) -> Result { - arrow_type - .arrow_type_enum - .as_ref() - .required("arrow_type_enum") - } -} impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType { type Error = Error; @@ -539,14 +480,6 @@ impl From for BuiltInWindowFunction { } } -impl TryFrom<&i32> for protobuf::PrimitiveScalarType { - type Error = Error; - - fn try_from(value: &i32) -> Result { - protobuf::PrimitiveScalarType::from_i32(*value) - .ok_or_else(|| Error::unknown("PrimitiveScalarType", *value)) - } -} impl TryFrom<&i32> for protobuf::AggregateFunction { type Error = Error; @@ -580,50 +513,6 @@ impl TryFrom<&protobuf::Schema> for Schema { } } -impl TryFrom<&protobuf::PrimitiveScalarType> for ScalarValue { - type Error = Error; - - fn try_from(scalar: &protobuf::PrimitiveScalarType) -> Result { - use protobuf::PrimitiveScalarType; - - Ok(match scalar { - PrimitiveScalarType::Null => Self::Null, - PrimitiveScalarType::Bool => Self::Boolean(None), - PrimitiveScalarType::Uint8 => Self::UInt8(None), - PrimitiveScalarType::Int8 => Self::Int8(None), - PrimitiveScalarType::Uint16 => Self::UInt16(None), - PrimitiveScalarType::Int16 => Self::Int16(None), - PrimitiveScalarType::Uint32 => Self::UInt32(None), - PrimitiveScalarType::Int32 => Self::Int32(None), - PrimitiveScalarType::Uint64 => Self::UInt64(None), - PrimitiveScalarType::Int64 => Self::Int64(None), - PrimitiveScalarType::Float32 => Self::Float32(None), - PrimitiveScalarType::Float64 => Self::Float64(None), - PrimitiveScalarType::Utf8 => Self::Utf8(None), - PrimitiveScalarType::LargeUtf8 => Self::LargeUtf8(None), - PrimitiveScalarType::Binary => Self::Binary(None), - PrimitiveScalarType::LargeBinary => Self::LargeBinary(None), - PrimitiveScalarType::Date32 => Self::Date32(None), - PrimitiveScalarType::Time64 => Self::Time64(None), - PrimitiveScalarType::TimestampMicrosecond => { - Self::TimestampMicrosecond(None, None) - } - PrimitiveScalarType::TimestampNanosecond => { - Self::TimestampNanosecond(None, None) - } - PrimitiveScalarType::Decimal128 => Self::Decimal128(None, 0, 0), - PrimitiveScalarType::Date64 => Self::Date64(None), - PrimitiveScalarType::TimestampSecond => Self::TimestampSecond(None, None), - PrimitiveScalarType::TimestampMillisecond => { - Self::TimestampMillisecond(None, None) - } - PrimitiveScalarType::IntervalYearmonth => Self::IntervalYearMonth(None), - PrimitiveScalarType::IntervalDaytime => Self::IntervalDayTime(None), - PrimitiveScalarType::IntervalMonthdaynano => Self::IntervalMonthDayNano(None), - }) - } -} - impl TryFrom<&protobuf::ScalarValue> for ScalarValue { type Error = Error; @@ -671,7 +560,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { Self::List(values, field) } Value::NullValue(v) => { - let null_type_enum = protobuf::PrimitiveScalarType::try_from(v)?; + let null_type_enum = protobuf::ArrowType::try_from(v)?; (&null_type_enum).try_into()? } Value::Decimal128Value(val) => { @@ -795,11 +684,6 @@ pub fn parse_expr( ExprType::GetIndexedField(field) => { let key = field.key.as_ref().ok_or_else(|| Error::required("value"))?; - let key = typechecked_scalar_value_conversion( - key.value.as_ref().ok_or_else(|| Error::required("value"))?, - protobuf::PrimitiveScalarType::Utf8, - )?; - let expr = parse_required_expr(&field.expr, registry, "expr")?; Ok(Expr::GetIndexedField(GetIndexedField::new( @@ -1304,15 +1188,6 @@ impl TryFrom for WindowFrameBound { } } -impl TryFrom<&Box> for DataType { - type Error = Error; - fn try_from(list: &Box) -> Result { - Ok(Self::List(Box::new( - list.as_ref().field_type.as_deref().required("field_type")?, - ))) - } -} - impl TryFrom<&i32> for protobuf::TimeUnit { type Error = Error; @@ -1359,176 +1234,6 @@ fn vec_to_array(v: Vec) -> [T; N] { }) } -//Does not typecheck lists -fn typechecked_scalar_value_conversion( - tested_type: &protobuf::scalar_value::Value, - required_type: protobuf::PrimitiveScalarType, -) -> Result { - use protobuf::{scalar_value::Value, PrimitiveScalarType}; - - Ok(match (tested_type, &required_type) { - (Value::BoolValue(v), PrimitiveScalarType::Bool) => { - ScalarValue::Boolean(Some(*v)) - } - (Value::Int8Value(v), PrimitiveScalarType::Int8) => { - ScalarValue::Int8(Some(*v as i8)) - } - (Value::Int16Value(v), PrimitiveScalarType::Int16) => { - ScalarValue::Int16(Some(*v as i16)) - } - (Value::Int32Value(v), PrimitiveScalarType::Int32) => { - ScalarValue::Int32(Some(*v)) - } - (Value::Int64Value(v), PrimitiveScalarType::Int64) => { - ScalarValue::Int64(Some(*v)) - } - (Value::Uint8Value(v), PrimitiveScalarType::Uint8) => { - ScalarValue::UInt8(Some(*v as u8)) - } - (Value::Uint16Value(v), PrimitiveScalarType::Uint16) => { - ScalarValue::UInt16(Some(*v as u16)) - } - (Value::Uint32Value(v), PrimitiveScalarType::Uint32) => { - ScalarValue::UInt32(Some(*v)) - } - (Value::Uint64Value(v), PrimitiveScalarType::Uint64) => { - ScalarValue::UInt64(Some(*v)) - } - (Value::Float32Value(v), PrimitiveScalarType::Float32) => { - ScalarValue::Float32(Some(*v)) - } - (Value::Float64Value(v), PrimitiveScalarType::Float64) => { - ScalarValue::Float64(Some(*v)) - } - (Value::Date32Value(v), PrimitiveScalarType::Date32) => { - ScalarValue::Date32(Some(*v)) - } - ( - Value::TimestampValue(protobuf::ScalarTimestampValue { - timezone, - value: - Some(protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(v)), - }), - PrimitiveScalarType::TimestampMicrosecond, - ) => ScalarValue::TimestampMicrosecond(Some(*v), unwrap_timezone(timezone)), - ( - Value::TimestampValue(protobuf::ScalarTimestampValue { - timezone, - value: - Some(protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(v)), - }), - PrimitiveScalarType::TimestampNanosecond, - ) => ScalarValue::TimestampNanosecond(Some(*v), unwrap_timezone(timezone)), - ( - Value::TimestampValue(protobuf::ScalarTimestampValue { - timezone, - value: Some(protobuf::scalar_timestamp_value::Value::TimeSecondValue(v)), - }), - PrimitiveScalarType::TimestampSecond, - ) => ScalarValue::TimestampSecond(Some(*v), unwrap_timezone(timezone)), - ( - Value::TimestampValue(protobuf::ScalarTimestampValue { - timezone, - value: - Some(protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(v)), - }), - PrimitiveScalarType::TimestampMillisecond, - ) => ScalarValue::TimestampMillisecond(Some(*v), unwrap_timezone(timezone)), - (Value::Utf8Value(v), PrimitiveScalarType::Utf8) => { - ScalarValue::Utf8(Some(v.to_owned())) - } - (Value::LargeUtf8Value(v), PrimitiveScalarType::LargeUtf8) => { - ScalarValue::LargeUtf8(Some(v.to_owned())) - } - - (Value::NullValue(i32_enum), required_scalar_type) => { - if *i32_enum == *required_scalar_type as i32 { - let pb_scalar_type = PrimitiveScalarType::try_from(i32_enum)?; - let scalar_value: ScalarValue = match pb_scalar_type { - PrimitiveScalarType::Bool => ScalarValue::Boolean(None), - PrimitiveScalarType::Uint8 => ScalarValue::UInt8(None), - PrimitiveScalarType::Int8 => ScalarValue::Int8(None), - PrimitiveScalarType::Uint16 => ScalarValue::UInt16(None), - PrimitiveScalarType::Int16 => ScalarValue::Int16(None), - PrimitiveScalarType::Uint32 => ScalarValue::UInt32(None), - PrimitiveScalarType::Int32 => ScalarValue::Int32(None), - PrimitiveScalarType::Uint64 => ScalarValue::UInt64(None), - PrimitiveScalarType::Int64 => ScalarValue::Int64(None), - PrimitiveScalarType::Float32 => ScalarValue::Float32(None), - PrimitiveScalarType::Float64 => ScalarValue::Float64(None), - PrimitiveScalarType::Utf8 => ScalarValue::Utf8(None), - PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None), - PrimitiveScalarType::Date32 => ScalarValue::Date32(None), - PrimitiveScalarType::Time64 => ScalarValue::Time64(None), - PrimitiveScalarType::TimestampMicrosecond => { - ScalarValue::TimestampMicrosecond(None, None) - } - PrimitiveScalarType::TimestampNanosecond => { - ScalarValue::TimestampNanosecond(None, None) - } - PrimitiveScalarType::Null => { - return Err(proto_error( - "Untyped scalar null is not a valid scalar value", - )); - } - PrimitiveScalarType::Decimal128 => { - ScalarValue::Decimal128(None, 0, 0) - } - PrimitiveScalarType::Date64 => ScalarValue::Date64(None), - PrimitiveScalarType::TimestampSecond => { - ScalarValue::TimestampSecond(None, None) - } - PrimitiveScalarType::TimestampMillisecond => { - ScalarValue::TimestampMillisecond(None, None) - } - PrimitiveScalarType::IntervalYearmonth => { - ScalarValue::IntervalYearMonth(None) - } - PrimitiveScalarType::IntervalDaytime => { - ScalarValue::IntervalDayTime(None) - } - PrimitiveScalarType::IntervalMonthdaynano => { - ScalarValue::IntervalMonthDayNano(None) - } - PrimitiveScalarType::Binary => ScalarValue::Binary(None), - PrimitiveScalarType::LargeBinary => ScalarValue::LargeBinary(None), - }; - scalar_value - } else { - return Err(proto_error("Could not convert to the proper type")); - } - } - (Value::Decimal128Value(val), PrimitiveScalarType::Decimal128) => { - let array = vec_to_array(val.value.clone()); - ScalarValue::Decimal128( - Some(i128::from_be_bytes(array)), - val.p as u8, - val.s as u8, - ) - } - (Value::Date64Value(v), PrimitiveScalarType::Date64) => { - ScalarValue::Date64(Some(*v)) - } - (Value::IntervalYearmonthValue(v), PrimitiveScalarType::IntervalYearmonth) => { - ScalarValue::IntervalYearMonth(Some(*v)) - } - (Value::IntervalDaytimeValue(v), PrimitiveScalarType::IntervalDaytime) => { - ScalarValue::IntervalDayTime(Some(*v)) - } - (Value::IntervalMonthDayNano(v), PrimitiveScalarType::IntervalMonthdaynano) => { - let protobuf::IntervalMonthDayNanoValue { - months, - days, - nanos, - } = v; - ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNanoType::make_value( - *months, *days, *nanos, - ))) - } - _ => return Err(proto_error("Could not convert to the proper type")), - }) -} - fn unwrap_timezone(proto_value: &str) -> Option { if proto_value.is_empty() { None diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index f8dab779b405..38228754d498 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -873,71 +873,72 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { fn try_from(val: &ScalarValue) -> Result { use datafusion_common::scalar; - use protobuf::{scalar_value::Value, PrimitiveScalarType}; + use protobuf::{scalar_value::Value}; + let data_type = val.get_datatype(); let scalar_val = match val { scalar::ScalarValue::Boolean(val) => { - create_proto_scalar(val, PrimitiveScalarType::Bool, |s| { + create_proto_scalar(val, &data_type, |s| { Value::BoolValue(*s) }) } scalar::ScalarValue::Float32(val) => { - create_proto_scalar(val, PrimitiveScalarType::Float32, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Float32Value(*s) }) } scalar::ScalarValue::Float64(val) => { - create_proto_scalar(val, PrimitiveScalarType::Float64, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Float64Value(*s) }) } scalar::ScalarValue::Int8(val) => { - create_proto_scalar(val, PrimitiveScalarType::Int8, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Int8Value(*s as i32) }) } scalar::ScalarValue::Int16(val) => { - create_proto_scalar(val, PrimitiveScalarType::Int16, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Int16Value(*s as i32) }) } scalar::ScalarValue::Int32(val) => { - create_proto_scalar(val, PrimitiveScalarType::Int32, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Int32Value(*s) }) } scalar::ScalarValue::Int64(val) => { - create_proto_scalar(val, PrimitiveScalarType::Int64, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Int64Value(*s) }) } scalar::ScalarValue::UInt8(val) => { - create_proto_scalar(val, PrimitiveScalarType::Uint8, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Uint8Value(*s as u32) }) } scalar::ScalarValue::UInt16(val) => { - create_proto_scalar(val, PrimitiveScalarType::Uint16, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Uint16Value(*s as u32) }) } scalar::ScalarValue::UInt32(val) => { - create_proto_scalar(val, PrimitiveScalarType::Uint32, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Uint32Value(*s) }) } scalar::ScalarValue::UInt64(val) => { - create_proto_scalar(val, PrimitiveScalarType::Uint64, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Uint64Value(*s) }) } scalar::ScalarValue::Utf8(val) => { - create_proto_scalar(val, PrimitiveScalarType::Utf8, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Utf8Value(s.to_owned()) }) } scalar::ScalarValue::LargeUtf8(val) => { - create_proto_scalar(val, PrimitiveScalarType::LargeUtf8, |s| { + create_proto_scalar(val, &data_type, |s| { Value::LargeUtf8Value(s.to_owned()) }) } @@ -966,12 +967,12 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { } } datafusion::scalar::ScalarValue::Date32(val) => { - create_proto_scalar(val, PrimitiveScalarType::Date32, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Date32Value(*s) }) } datafusion::scalar::ScalarValue::TimestampMicrosecond(val, tz) => { - create_proto_scalar(val, PrimitiveScalarType::TimestampMicrosecond, |s| { + create_proto_scalar(val, &data_type, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), value: Some( @@ -983,7 +984,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { }) } datafusion::scalar::ScalarValue::TimestampNanosecond(val, tz) => { - create_proto_scalar(val, PrimitiveScalarType::TimestampNanosecond, |s| { + create_proto_scalar(val, &data_type, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), value: Some( @@ -1008,17 +1009,17 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { } None => protobuf::ScalarValue { value: Some(protobuf::scalar_value::Value::NullValue( - PrimitiveScalarType::Decimal128 as i32, + data_type.try_into()?, )), }, }, datafusion::scalar::ScalarValue::Date64(val) => { - create_proto_scalar(val, PrimitiveScalarType::Date64, |s| { + create_proto_scalar(val, &data_type, |s| { Value::Date64Value(*s) }) } datafusion::scalar::ScalarValue::TimestampSecond(val, tz) => { - create_proto_scalar(val, PrimitiveScalarType::TimestampSecond, |s| { + create_proto_scalar(val, &data_type, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), value: Some( @@ -1028,7 +1029,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { }) } datafusion::scalar::ScalarValue::TimestampMillisecond(val, tz) => { - create_proto_scalar(val, PrimitiveScalarType::TimestampMillisecond, |s| { + create_proto_scalar(val, &data_type, |s| { Value::TimestampValue(protobuf::ScalarTimestampValue { timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(), value: Some( @@ -1040,26 +1041,26 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { }) } datafusion::scalar::ScalarValue::IntervalYearMonth(val) => { - create_proto_scalar(val, PrimitiveScalarType::IntervalYearmonth, |s| { + create_proto_scalar(val, &data_type, |s| { Value::IntervalYearmonthValue(*s) }) } datafusion::scalar::ScalarValue::IntervalDayTime(val) => { - create_proto_scalar(val, PrimitiveScalarType::IntervalDaytime, |s| { + create_proto_scalar(val, &data_type, |s| { Value::IntervalDaytimeValue(*s) }) } datafusion::scalar::ScalarValue::Null => protobuf::ScalarValue { - value: Some(Value::NullValue(PrimitiveScalarType::Null as i32)), + value: Some(Value::NullValue(data_type.try_into()?)), }, scalar::ScalarValue::Binary(val) => { - create_proto_scalar(val, PrimitiveScalarType::Binary, |s| { + create_proto_scalar(val, &data_type, |s| { Value::BinaryValue(s.to_owned()) }) } scalar::ScalarValue::LargeBinary(val) => { - create_proto_scalar(val, PrimitiveScalarType::LargeBinary, |s| { + create_proto_scalar(val, &data_type, |s| { Value::LargeBinaryValue(s.to_owned()) }) } @@ -1070,7 +1071,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { } datafusion::scalar::ScalarValue::Time64(v) => { - create_proto_scalar(v, PrimitiveScalarType::Time64, |v| { + create_proto_scalar(v, &data_type, |v| { Value::Time64Value(*v) }) } @@ -1084,8 +1085,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { nanos, }) } else { - let null_arrow_type = PrimitiveScalarType::IntervalMonthdaynano; - protobuf::scalar_value::Value::NullValue(null_arrow_type as i32) + protobuf::scalar_value::Value::NullValue(data_type.try_into()?) }; protobuf::ScalarValue { value: Some(value) } @@ -1237,14 +1237,20 @@ impl From<&IntervalUnit> for protobuf::IntervalUnit { } } +/// Creates a scalar protobuf value from an optional value (T), and +/// encoding None as the appropriate datatype fn create_proto_scalar protobuf::scalar_value::Value>( v: &Option, - null_arrow_type: protobuf::PrimitiveScalarType, + null_arrow_type: &DataType, constructor: T, ) -> protobuf::ScalarValue { + + let value = v.as_ref().map(constructor).unwrap_or( + protobuf::scalar_value::Value::NullValue(null_arrow_type.try_into()?), + ); + + protobuf::ScalarValue { - value: Some(v.as_ref().map(constructor).unwrap_or( - protobuf::scalar_value::Value::NullValue(null_arrow_type as i32), - )), + value: Some(value), } } From d8fa6b1f76b1379433475222b41f6c32a20b7711 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 23 Oct 2022 06:50:45 -0400 Subject: [PATCH 2/5] comments --- datafusion/proto/proto/datafusion.proto | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 160f4974770d..48dbda4b80a8 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -804,13 +804,9 @@ message Decimal128{ int64 s = 3; } - -// Broke out into multiple message types so that type -// metadata did not need to be in separate message -// All types that are of the empty message types contain no additional metadata -// about the type +// Serialized data type message ArrowType{ - oneof arrow_type_enum{ + oneof arrow_type_enum { EmptyMessage NONE = 1; // arrow::Type::NA EmptyMessage BOOL = 2; // arrow::Type::BOOL EmptyMessage UINT8 = 3; // arrow::Type::UINT8 From ebcc6a0e0d2c37b1700cf4643a5b559bb7c752af Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 23 Oct 2022 07:11:30 -0400 Subject: [PATCH 3/5] it compiles --- datafusion/common/src/scalar.rs | 9 ++ datafusion/proto/src/from_proto.rs | 20 ++++- datafusion/proto/src/to_proto.rs | 135 ++++++++++------------------- 3 files changed, 73 insertions(+), 91 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 0afb28c6b2a8..af510efd578e 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2238,6 +2238,15 @@ impl_try_from!(Float32, f32); impl_try_from!(Float64, f64); impl_try_from!(Boolean, bool); +impl TryFrom for ScalarValue { + type Error = DataFusionError; + + /// Create a Null instance of ScalarValue for this datatype + fn try_from(datatype: DataType) -> Result { + (&datatype).try_into() + } +} + impl TryFrom<&DataType> for ScalarValue { type Error = DataFusionError; diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 1890c2460372..e71cd7c5f9c9 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -202,7 +202,16 @@ impl From for WindowFrameUnits { } } +impl TryFrom<&protobuf::ArrowType> for DataType { + type Error = Error; + fn try_from(arrow_type: &protobuf::ArrowType) -> Result { + arrow_type + .arrow_type_enum + .as_ref() + .required("arrow_type_enum") + } +} impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType { type Error = Error; @@ -480,7 +489,6 @@ impl From for BuiltInWindowFunction { } } - impl TryFrom<&i32> for protobuf::AggregateFunction { type Error = Error; @@ -560,8 +568,8 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { Self::List(values, field) } Value::NullValue(v) => { - let null_type_enum = protobuf::ArrowType::try_from(v)?; - (&null_type_enum).try_into()? + let null_type: DataType = v.try_into()?; + null_type.try_into().map_err(Error::DataFusionError)? } Value::Decimal128Value(val) => { let array = vec_to_array(val.value.clone()); @@ -682,7 +690,11 @@ pub fn parse_expr( Box::new(parse_required_expr(&binary_expr.r, registry, "r")?), ))), ExprType::GetIndexedField(field) => { - let key = field.key.as_ref().ok_or_else(|| Error::required("value"))?; + let key = field + .key + .as_ref() + .ok_or_else(|| Error::required("value"))? + .try_into()?; let expr = parse_required_expr(&field.expr, registry, "expr")?; diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index 38228754d498..5ef1285e513e 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -873,69 +873,45 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { fn try_from(val: &ScalarValue) -> Result { use datafusion_common::scalar; - use protobuf::{scalar_value::Value}; + use protobuf::scalar_value::Value; let data_type = val.get_datatype(); - let scalar_val = match val { + match val { scalar::ScalarValue::Boolean(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::BoolValue(*s) - }) + create_proto_scalar(val, &data_type, |s| Value::BoolValue(*s)) } scalar::ScalarValue::Float32(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Float32Value(*s) - }) + create_proto_scalar(val, &data_type, |s| Value::Float32Value(*s)) } scalar::ScalarValue::Float64(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Float64Value(*s) - }) + create_proto_scalar(val, &data_type, |s| Value::Float64Value(*s)) } scalar::ScalarValue::Int8(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Int8Value(*s as i32) - }) + create_proto_scalar(val, &data_type, |s| Value::Int8Value(*s as i32)) } scalar::ScalarValue::Int16(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Int16Value(*s as i32) - }) + create_proto_scalar(val, &data_type, |s| Value::Int16Value(*s as i32)) } scalar::ScalarValue::Int32(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Int32Value(*s) - }) + create_proto_scalar(val, &data_type, |s| Value::Int32Value(*s)) } scalar::ScalarValue::Int64(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Int64Value(*s) - }) + create_proto_scalar(val, &data_type, |s| Value::Int64Value(*s)) } scalar::ScalarValue::UInt8(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Uint8Value(*s as u32) - }) + create_proto_scalar(val, &data_type, |s| Value::Uint8Value(*s as u32)) } scalar::ScalarValue::UInt16(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Uint16Value(*s as u32) - }) + create_proto_scalar(val, &data_type, |s| Value::Uint16Value(*s as u32)) } scalar::ScalarValue::UInt32(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Uint32Value(*s) - }) + create_proto_scalar(val, &data_type, |s| Value::Uint32Value(*s)) } scalar::ScalarValue::UInt64(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Uint64Value(*s) - }) + create_proto_scalar(val, &data_type, |s| Value::Uint64Value(*s)) } scalar::ScalarValue::Utf8(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Utf8Value(s.to_owned()) - }) + create_proto_scalar(val, &data_type, |s| Value::Utf8Value(s.to_owned())) } scalar::ScalarValue::LargeUtf8(val) => { create_proto_scalar(val, &data_type, |s| { @@ -956,7 +932,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { let field = boxed_field.as_ref().try_into()?; - protobuf::ScalarValue { + Ok(protobuf::ScalarValue { value: Some(protobuf::scalar_value::Value::ListValue( protobuf::ScalarListValue { is_null, @@ -964,12 +940,10 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { values, }, )), - } + }) } datafusion::scalar::ScalarValue::Date32(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Date32Value(*s) - }) + create_proto_scalar(val, &data_type, |s| Value::Date32Value(*s)) } datafusion::scalar::ScalarValue::TimestampMicrosecond(val, tz) => { create_proto_scalar(val, &data_type, |s| { @@ -999,24 +973,22 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { Some(v) => { let array = v.to_be_bytes(); let vec_val: Vec = array.to_vec(); - protobuf::ScalarValue { + Ok(protobuf::ScalarValue { value: Some(Value::Decimal128Value(protobuf::Decimal128 { value: vec_val, p: *p as i64, s: *s as i64, })), - } + }) } - None => protobuf::ScalarValue { + None => Ok(protobuf::ScalarValue { value: Some(protobuf::scalar_value::Value::NullValue( - data_type.try_into()?, + (&data_type).try_into()?, )), - }, + }), }, datafusion::scalar::ScalarValue::Date64(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::Date64Value(*s) - }) + create_proto_scalar(val, &data_type, |s| Value::Date64Value(*s)) } datafusion::scalar::ScalarValue::TimestampSecond(val, tz) => { create_proto_scalar(val, &data_type, |s| { @@ -1046,34 +1018,26 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { }) } datafusion::scalar::ScalarValue::IntervalDayTime(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::IntervalDaytimeValue(*s) - }) + create_proto_scalar(val, &data_type, |s| Value::IntervalDaytimeValue(*s)) } - datafusion::scalar::ScalarValue::Null => protobuf::ScalarValue { - value: Some(Value::NullValue(data_type.try_into()?)), - }, + datafusion::scalar::ScalarValue::Null => Ok(protobuf::ScalarValue { + value: Some(Value::NullValue((&data_type).try_into()?)), + }), scalar::ScalarValue::Binary(val) => { - create_proto_scalar(val, &data_type, |s| { - Value::BinaryValue(s.to_owned()) - }) + create_proto_scalar(val, &data_type, |s| Value::BinaryValue(s.to_owned())) } scalar::ScalarValue::LargeBinary(val) => { create_proto_scalar(val, &data_type, |s| { Value::LargeBinaryValue(s.to_owned()) }) } - scalar::ScalarValue::FixedSizeBinary(_, _) => { - return Err(Error::General( - "FixedSizeBinary is not yet implemented".to_owned(), - )) - } + scalar::ScalarValue::FixedSizeBinary(_, _) => Err(Error::General( + "FixedSizeBinary is not yet implemented".to_owned(), + )), datafusion::scalar::ScalarValue::Time64(v) => { - create_proto_scalar(v, &data_type, |v| { - Value::Time64Value(*v) - }) + create_proto_scalar(v, &data_type, |v| Value::Time64Value(*v)) } datafusion::scalar::ScalarValue::IntervalMonthDayNano(v) => { @@ -1085,10 +1049,10 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { nanos, }) } else { - protobuf::scalar_value::Value::NullValue(data_type.try_into()?) + protobuf::scalar_value::Value::NullValue((&data_type).try_into()?) }; - protobuf::ScalarValue { value: Some(value) } + Ok(protobuf::ScalarValue { value: Some(value) }) } datafusion::scalar::ScalarValue::Struct(values, fields) => { @@ -1110,28 +1074,26 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { .map(|f| f.try_into()) .collect::, _>>()?; - protobuf::ScalarValue { + Ok(protobuf::ScalarValue { value: Some(Value::StructValue(protobuf::StructValue { field_values, fields, })), - } + }) } datafusion::scalar::ScalarValue::Dictionary(index_type, val) => { let value: protobuf::ScalarValue = val.as_ref().try_into()?; - protobuf::ScalarValue { + Ok(protobuf::ScalarValue { value: Some(Value::DictionaryValue(Box::new( protobuf::ScalarDictionaryValue { index_type: Some(index_type.as_ref().try_into()?), value: Some(Box::new(value)), }, ))), - } + }) } - }; - - Ok(scalar_val) + } } } @@ -1243,14 +1205,13 @@ fn create_proto_scalar protobuf::scalar_value::Value>( v: &Option, null_arrow_type: &DataType, constructor: T, -) -> protobuf::ScalarValue { - - let value = v.as_ref().map(constructor).unwrap_or( - protobuf::scalar_value::Value::NullValue(null_arrow_type.try_into()?), - ); - - - protobuf::ScalarValue { - value: Some(value), - } +) -> Result { + let value = + v.as_ref() + .map(constructor) + .unwrap_or(protobuf::scalar_value::Value::NullValue( + null_arrow_type.try_into()?, + )); + + Ok(protobuf::ScalarValue { value: Some(value) }) } From d754b9451a08e692e6dc27f22ba2e8c511b73bdf Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 23 Oct 2022 07:22:59 -0400 Subject: [PATCH 4/5] Add additional scalar value null construction --- datafusion/common/src/scalar.rs | 12 ++++++++++++ datafusion/proto/src/from_proto.rs | 8 -------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index af510efd578e..62e7f0753102 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2269,6 +2269,9 @@ impl TryFrom<&DataType> for ScalarValue { } DataType::Utf8 => ScalarValue::Utf8(None), DataType::LargeUtf8 => ScalarValue::LargeUtf8(None), + DataType::Binary => ScalarValue::Binary(None), + DataType::FixedSizeBinary(len) => ScalarValue::FixedSizeBinary(*len, None), + DataType::LargeBinary => ScalarValue::LargeBinary(None), DataType::Date32 => ScalarValue::Date32(None), DataType::Date64 => ScalarValue::Date64(None), DataType::Time64(TimeUnit::Nanosecond) => ScalarValue::Time64(None), @@ -2284,6 +2287,15 @@ impl TryFrom<&DataType> for ScalarValue { DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => { ScalarValue::TimestampNanosecond(None, tz_opt.clone()) } + DataType::Interval(IntervalUnit::YearMonth) => { + ScalarValue::IntervalYearMonth(None) + } + DataType::Interval(IntervalUnit::DayTime) => { + ScalarValue::IntervalDayTime(None) + } + DataType::Interval(IntervalUnit::MonthDayNano) => { + ScalarValue::IntervalMonthDayNano(None) + } DataType::Dictionary(index_type, value_type) => ScalarValue::Dictionary( index_type.clone(), Box::new(value_type.as_ref().try_into()?), diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index e71cd7c5f9c9..ba4b8b6fc633 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -1246,14 +1246,6 @@ fn vec_to_array(v: Vec) -> [T; N] { }) } -fn unwrap_timezone(proto_value: &str) -> Option { - if proto_value.is_empty() { - None - } else { - Some(proto_value.to_string()) - } -} - pub fn from_proto_binary_op(op: &str) -> Result { match op { "And" => Ok(Operator::And), From 595c9706782606ae7bc33d0e1464f40c8179f96a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 24 Oct 2022 15:48:41 -0400 Subject: [PATCH 5/5] reserve old field name --- datafusion/proto/proto/datafusion.proto | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 48dbda4b80a8..643ea91f100f 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -761,7 +761,10 @@ message StructValue { } message ScalarValue{ - oneof value { + // was PrimitiveScalarType null_value = 19; + reserved 19; + + oneof value { // was PrimitiveScalarType null_value = 19; // Null value of any type ArrowType null_value = 33;