Skip to content

Commit

Permalink
Add serialization of ScalarValue::Binary and `ScalarValue::LargeBin…
Browse files Browse the repository at this point in the history
…ary`, `ScalarValue::Time64`
  • Loading branch information
alamb committed Sep 19, 2022
1 parent e1b4689 commit 677df35
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 40 deletions.
16 changes: 12 additions & 4 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,9 @@ message ScalarValue{
int64 interval_daytime_value = 25;
ScalarTimestampValue timestamp_value = 26;
ScalarDictionaryValue dictionary_value = 27;
bytes binary_value = 28;
bytes large_binary_value = 29;
int64 time64_value = 30;
}
}

Expand Down Expand Up @@ -786,16 +789,21 @@ enum PrimitiveScalarType{
UTF8 = 11;
LARGE_UTF8 = 12;
DATE32 = 13;
TIME_MICROSECOND = 14;
TIME_NANOSECOND = 15;
TIMESTAMP_MICROSECOND = 14;
TIMESTAMP_NANOSECOND = 15;
NULL = 16;

DECIMAL128 = 17;
DATE64 = 20;
TIME_SECOND = 21;
TIME_MILLISECOND = 22;
TIMESTAMP_SECOND = 21;
TIMESTAMP_MILLISECOND = 22;
INTERVAL_YEARMONTH = 23;
INTERVAL_DAYTIME = 24;

BINARY = 25;
LARGE_BINARY = 26;

TIME64 = 27;
}

message ScalarType{
Expand Down
52 changes: 34 additions & 18 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,25 @@ impl From<protobuf::PrimitiveScalarType> for DataType {
protobuf::PrimitiveScalarType::Float64 => DataType::Float64,
protobuf::PrimitiveScalarType::Utf8 => DataType::Utf8,
protobuf::PrimitiveScalarType::LargeUtf8 => DataType::LargeUtf8,
protobuf::PrimitiveScalarType::Binary => DataType::Binary,
protobuf::PrimitiveScalarType::LargeBinary => DataType::LargeBinary,
protobuf::PrimitiveScalarType::Date32 => DataType::Date32,
protobuf::PrimitiveScalarType::TimeMicrosecond => {
DataType::Time64(TimeUnit::Microsecond)
}
protobuf::PrimitiveScalarType::TimeNanosecond => {
protobuf::PrimitiveScalarType::Time64 => {
DataType::Time64(TimeUnit::Nanosecond)
}
protobuf::PrimitiveScalarType::TimestampMicrosecond => {
DataType::Timestamp(TimeUnit::Microsecond, None)
}
protobuf::PrimitiveScalarType::TimestampNanosecond => {
DataType::Timestamp(TimeUnit::Nanosecond, None)
}
protobuf::PrimitiveScalarType::Null => DataType::Null,
protobuf::PrimitiveScalarType::Decimal128 => DataType::Decimal128(0, 0),
protobuf::PrimitiveScalarType::Date64 => DataType::Date64,
protobuf::PrimitiveScalarType::TimeSecond => {
protobuf::PrimitiveScalarType::TimestampSecond => {
DataType::Timestamp(TimeUnit::Second, None)
}
protobuf::PrimitiveScalarType::TimeMillisecond => {
protobuf::PrimitiveScalarType::TimestampMillisecond => {
DataType::Timestamp(TimeUnit::Millisecond, None)
}
protobuf::PrimitiveScalarType::IntervalYearmonth => {
Expand Down Expand Up @@ -643,15 +648,20 @@ impl TryFrom<&protobuf::PrimitiveScalarType> for ScalarValue {
PrimitiveScalarType::Float64 => Self::Float64(None),
PrimitiveScalarType::Utf8 => Self::Utf8(None),
PrimitiveScalarType::LargeUtf8 => Self::LargeUtf8(None),
PrimitiveScalarType::Binary => Self::Binary(None),
PrimitiveScalarType::LargeBinary => Self::LargeBinary(None),
PrimitiveScalarType::Date32 => Self::Date32(None),
PrimitiveScalarType::TimeMicrosecond => {
PrimitiveScalarType::Time64 => Self::Time64(None),
PrimitiveScalarType::TimestampMicrosecond => {
Self::TimestampMicrosecond(None, None)
}
PrimitiveScalarType::TimeNanosecond => Self::TimestampNanosecond(None, None),
PrimitiveScalarType::TimestampNanosecond => {
Self::TimestampNanosecond(None, None)
}
PrimitiveScalarType::Decimal128 => Self::Decimal128(None, 0, 0),
PrimitiveScalarType::Date64 => Self::Date64(None),
PrimitiveScalarType::TimeSecond => Self::TimestampSecond(None, None),
PrimitiveScalarType::TimeMillisecond => {
PrimitiveScalarType::TimestampSecond => Self::TimestampSecond(None, None),
PrimitiveScalarType::TimestampMillisecond => {
Self::TimestampMillisecond(None, None)
}
PrimitiveScalarType::IntervalYearmonth => Self::IntervalYearMonth(None),
Expand Down Expand Up @@ -749,6 +759,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
)
}
Value::Date64Value(v) => Self::Date64(Some(*v)),
Value::Time64Value(v) => Self::Time64(Some(*v)),
Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(*v)),
Value::TimestampValue(v) => {
Expand Down Expand Up @@ -792,6 +803,8 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {

Self::Dictionary(Box::new(index_type), Box::new(value))
}
Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
})
}
}
Expand Down Expand Up @@ -1419,30 +1432,30 @@ fn typechecked_scalar_value_conversion(
value:
Some(protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(v)),
}),
PrimitiveScalarType::TimeMicrosecond,
PrimitiveScalarType::TimestampMicrosecond,
) => ScalarValue::TimestampMicrosecond(Some(*v), unwrap_timezone(timezone)),
(
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone,
value:
Some(protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(v)),
}),
PrimitiveScalarType::TimeNanosecond,
PrimitiveScalarType::TimestampNanosecond,
) => ScalarValue::TimestampNanosecond(Some(*v), unwrap_timezone(timezone)),
(
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone,
value: Some(protobuf::scalar_timestamp_value::Value::TimeSecondValue(v)),
}),
PrimitiveScalarType::TimeSecond,
PrimitiveScalarType::TimestampSecond,
) => ScalarValue::TimestampSecond(Some(*v), unwrap_timezone(timezone)),
(
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone,
value:
Some(protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(v)),
}),
PrimitiveScalarType::TimeMillisecond,
PrimitiveScalarType::TimestampMillisecond,
) => ScalarValue::TimestampMillisecond(Some(*v), unwrap_timezone(timezone)),
(Value::Utf8Value(v), PrimitiveScalarType::Utf8) => {
ScalarValue::Utf8(Some(v.to_owned()))
Expand All @@ -1469,10 +1482,11 @@ fn typechecked_scalar_value_conversion(
PrimitiveScalarType::Utf8 => ScalarValue::Utf8(None),
PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None),
PrimitiveScalarType::Date32 => ScalarValue::Date32(None),
PrimitiveScalarType::TimeMicrosecond => {
PrimitiveScalarType::Time64 => ScalarValue::Time64(None),
PrimitiveScalarType::TimestampMicrosecond => {
ScalarValue::TimestampMicrosecond(None, None)
}
PrimitiveScalarType::TimeNanosecond => {
PrimitiveScalarType::TimestampNanosecond => {
ScalarValue::TimestampNanosecond(None, None)
}
PrimitiveScalarType::Null => {
Expand All @@ -1484,10 +1498,10 @@ fn typechecked_scalar_value_conversion(
ScalarValue::Decimal128(None, 0, 0)
}
PrimitiveScalarType::Date64 => ScalarValue::Date64(None),
PrimitiveScalarType::TimeSecond => {
PrimitiveScalarType::TimestampSecond => {
ScalarValue::TimestampSecond(None, None)
}
PrimitiveScalarType::TimeMillisecond => {
PrimitiveScalarType::TimestampMillisecond => {
ScalarValue::TimestampMillisecond(None, None)
}
PrimitiveScalarType::IntervalYearmonth => {
Expand All @@ -1496,6 +1510,8 @@ fn typechecked_scalar_value_conversion(
PrimitiveScalarType::IntervalDaytime => {
ScalarValue::IntervalDayTime(None)
}
PrimitiveScalarType::Binary => ScalarValue::Binary(None),
PrimitiveScalarType::LargeBinary => ScalarValue::LargeBinary(None),
};
scalar_value
} else {
Expand Down
8 changes: 8 additions & 0 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ mod roundtrip_tests {
ScalarValue::LargeUtf8(Some(String::from("Test Large utf8"))),
ScalarValue::Date32(Some(0)),
ScalarValue::Date32(Some(i32::MAX)),
ScalarValue::Date32(None),
ScalarValue::Time64(Some(0)),
ScalarValue::Time64(Some(i64::MAX)),
ScalarValue::Time64(None),
ScalarValue::TimestampNanosecond(Some(0), None),
ScalarValue::TimestampNanosecond(Some(i64::MAX), None),
ScalarValue::TimestampNanosecond(Some(0), Some("UTC".to_string())),
Expand Down Expand Up @@ -459,6 +463,10 @@ mod roundtrip_tests {
Box::new(DataType::Int32),
Box::new(ScalarValue::Utf8(None)),
),
ScalarValue::Binary(Some(b"bar".to_vec())),
ScalarValue::Binary(None),
ScalarValue::LargeBinary(Some(b"bar".to_vec())),
ScalarValue::LargeBinary(None),
];

for test_case in should_pass.into_iter() {
Expand Down
40 changes: 22 additions & 18 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::TimestampMicrosecond(val, tz) => {
create_proto_scalar(val, PrimitiveScalarType::TimeMicrosecond, |s| {
create_proto_scalar(val, PrimitiveScalarType::TimestampMicrosecond, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand All @@ -1110,7 +1110,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::TimestampNanosecond(val, tz) => {
create_proto_scalar(val, PrimitiveScalarType::TimeNanosecond, |s| {
create_proto_scalar(val, PrimitiveScalarType::TimestampNanosecond, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand Down Expand Up @@ -1145,7 +1145,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::TimestampSecond(val, tz) => {
create_proto_scalar(val, PrimitiveScalarType::TimeSecond, |s| {
create_proto_scalar(val, PrimitiveScalarType::TimestampSecond, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand All @@ -1155,7 +1155,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::TimestampMillisecond(val, tz) => {
create_proto_scalar(val, PrimitiveScalarType::TimeMillisecond, |s| {
create_proto_scalar(val, PrimitiveScalarType::TimestampMillisecond, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand All @@ -1180,19 +1180,21 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
value: Some(Value::NullValue(PrimitiveScalarType::Null as i32)),
},

datafusion::scalar::ScalarValue::Binary(_) => {
// not yet implemented (TODO file ticket)
return Err(Error::invalid_scalar_value(val));
scalar::ScalarValue::Binary(val) => {
create_proto_scalar(val, PrimitiveScalarType::Binary, |s| {
Value::BinaryValue(s.to_owned())
})
}

datafusion::scalar::ScalarValue::LargeBinary(_) => {
// not yet implemented (TODO file ticket)
return Err(Error::invalid_scalar_value(val));
scalar::ScalarValue::LargeBinary(val) => {
create_proto_scalar(val, PrimitiveScalarType::LargeBinary, |s| {
Value::LargeBinaryValue(s.to_owned())
})
}

datafusion::scalar::ScalarValue::Time64(_) => {
// not yet implemented (TODO file ticket)
return Err(Error::invalid_scalar_value(val));
datafusion::scalar::ScalarValue::Time64(v) => {
create_proto_scalar(v, PrimitiveScalarType::Time64, |v| {
Value::Time64Value(*v)
})
}

datafusion::scalar::ScalarValue::IntervalMonthDayNano(_) => {
Expand Down Expand Up @@ -1335,10 +1337,10 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
DataType::Date32 => Self::Scalar(PrimitiveScalarType::Date32 as i32),
DataType::Time64(time_unit) => match time_unit {
TimeUnit::Microsecond => {
Self::Scalar(PrimitiveScalarType::TimeMicrosecond as i32)
Self::Scalar(PrimitiveScalarType::TimestampMicrosecond as i32)
}
TimeUnit::Nanosecond => {
Self::Scalar(PrimitiveScalarType::TimeNanosecond as i32)
Self::Scalar(PrimitiveScalarType::TimestampNanosecond as i32)
}
_ => {
return Err(Error::invalid_time_unit(time_unit));
Expand Down Expand Up @@ -1379,8 +1381,10 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
DataType::Float64 => PrimitiveScalarType::Float64,
DataType::Date32 => PrimitiveScalarType::Date32,
DataType::Time64(time_unit) => match time_unit {
TimeUnit::Microsecond => PrimitiveScalarType::TimeMicrosecond,
TimeUnit::Nanosecond => PrimitiveScalarType::TimeNanosecond,
TimeUnit::Microsecond => {
PrimitiveScalarType::TimestampMicrosecond
}
TimeUnit::Nanosecond => PrimitiveScalarType::TimestampNanosecond,
_ => {
return Err(Error::invalid_time_unit(time_unit));
}
Expand Down

0 comments on commit 677df35

Please sign in to comment.