Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serialization of ScalarValue::Binary and ScalarValue::LargeBinary, ScalarValue::Time64 #3534

Merged
merged 1 commit into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,9 @@ message ScalarValue{
int64 interval_daytime_value = 25;
ScalarTimestampValue timestamp_value = 26;
ScalarDictionaryValue dictionary_value = 27;
bytes binary_value = 28;
bytes large_binary_value = 29;
int64 time64_value = 30;
}
}

Expand Down Expand Up @@ -786,16 +789,21 @@ enum PrimitiveScalarType{
UTF8 = 11;
LARGE_UTF8 = 12;
DATE32 = 13;
TIME_MICROSECOND = 14;
TIME_NANOSECOND = 15;
TIMESTAMP_MICROSECOND = 14;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed these fields because they are for Timestamp not actually Time (which are different in Arrow).

TIMESTAMP_NANOSECOND = 15;
NULL = 16;

DECIMAL128 = 17;
DATE64 = 20;
TIME_SECOND = 21;
TIME_MILLISECOND = 22;
TIMESTAMP_SECOND = 21;
TIMESTAMP_MILLISECOND = 22;
INTERVAL_YEARMONTH = 23;
INTERVAL_DAYTIME = 24;

BINARY = 25;
LARGE_BINARY = 26;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly TIME_NANOSECOND already existed

Copy link
Contributor Author

@alamb alamb Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a huge fan of the duplication between PrimitiveScalarType and ArrowType -- I am just following the existing patterns in this PR, but I will attempt to fix this in a follow on PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out this was a bug in my original implementation (which was caught by #3537)


TIME64 = 27;
}

message ScalarType{
Expand Down
52 changes: 34 additions & 18 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,25 @@ impl From<protobuf::PrimitiveScalarType> for DataType {
protobuf::PrimitiveScalarType::Float64 => DataType::Float64,
protobuf::PrimitiveScalarType::Utf8 => DataType::Utf8,
protobuf::PrimitiveScalarType::LargeUtf8 => DataType::LargeUtf8,
protobuf::PrimitiveScalarType::Binary => DataType::Binary,
protobuf::PrimitiveScalarType::LargeBinary => DataType::LargeBinary,
protobuf::PrimitiveScalarType::Date32 => DataType::Date32,
protobuf::PrimitiveScalarType::TimeMicrosecond => {
DataType::Time64(TimeUnit::Microsecond)
}
protobuf::PrimitiveScalarType::TimeNanosecond => {
protobuf::PrimitiveScalarType::Time64 => {
DataType::Time64(TimeUnit::Nanosecond)
}
protobuf::PrimitiveScalarType::TimestampMicrosecond => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the names of PrimitiveScalarType from Time here to Timestamp be consistent with the ScalarValue variants as well as the arrow type system

DataType::Timestamp(TimeUnit::Microsecond, None)
}
protobuf::PrimitiveScalarType::TimestampNanosecond => {
DataType::Timestamp(TimeUnit::Nanosecond, None)
}
protobuf::PrimitiveScalarType::Null => DataType::Null,
protobuf::PrimitiveScalarType::Decimal128 => DataType::Decimal128(0, 0),
protobuf::PrimitiveScalarType::Date64 => DataType::Date64,
protobuf::PrimitiveScalarType::TimeSecond => {
protobuf::PrimitiveScalarType::TimestampSecond => {
DataType::Timestamp(TimeUnit::Second, None)
}
protobuf::PrimitiveScalarType::TimeMillisecond => {
protobuf::PrimitiveScalarType::TimestampMillisecond => {
DataType::Timestamp(TimeUnit::Millisecond, None)
}
protobuf::PrimitiveScalarType::IntervalYearmonth => {
Expand Down Expand Up @@ -643,15 +648,20 @@ impl TryFrom<&protobuf::PrimitiveScalarType> for ScalarValue {
PrimitiveScalarType::Float64 => Self::Float64(None),
PrimitiveScalarType::Utf8 => Self::Utf8(None),
PrimitiveScalarType::LargeUtf8 => Self::LargeUtf8(None),
PrimitiveScalarType::Binary => Self::Binary(None),
PrimitiveScalarType::LargeBinary => Self::LargeBinary(None),
PrimitiveScalarType::Date32 => Self::Date32(None),
PrimitiveScalarType::TimeMicrosecond => {
PrimitiveScalarType::Time64 => Self::Time64(None),
PrimitiveScalarType::TimestampMicrosecond => {
Self::TimestampMicrosecond(None, None)
}
PrimitiveScalarType::TimeNanosecond => Self::TimestampNanosecond(None, None),
PrimitiveScalarType::TimestampNanosecond => {
Self::TimestampNanosecond(None, None)
}
PrimitiveScalarType::Decimal128 => Self::Decimal128(None, 0, 0),
PrimitiveScalarType::Date64 => Self::Date64(None),
PrimitiveScalarType::TimeSecond => Self::TimestampSecond(None, None),
PrimitiveScalarType::TimeMillisecond => {
PrimitiveScalarType::TimestampSecond => Self::TimestampSecond(None, None),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were incorrectly previously set to be Time rather than Timestamp

PrimitiveScalarType::TimestampMillisecond => {
Self::TimestampMillisecond(None, None)
}
PrimitiveScalarType::IntervalYearmonth => Self::IntervalYearMonth(None),
Expand Down Expand Up @@ -749,6 +759,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
)
}
Value::Date64Value(v) => Self::Date64(Some(*v)),
Value::Time64Value(v) => Self::Time64(Some(*v)),
Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(*v)),
Value::TimestampValue(v) => {
Expand Down Expand Up @@ -792,6 +803,8 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {

Self::Dictionary(Box::new(index_type), Box::new(value))
}
Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
})
}
}
Expand Down Expand Up @@ -1419,30 +1432,30 @@ fn typechecked_scalar_value_conversion(
value:
Some(protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(v)),
}),
PrimitiveScalarType::TimeMicrosecond,
PrimitiveScalarType::TimestampMicrosecond,
) => ScalarValue::TimestampMicrosecond(Some(*v), unwrap_timezone(timezone)),
(
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone,
value:
Some(protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(v)),
}),
PrimitiveScalarType::TimeNanosecond,
PrimitiveScalarType::TimestampNanosecond,
) => ScalarValue::TimestampNanosecond(Some(*v), unwrap_timezone(timezone)),
(
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone,
value: Some(protobuf::scalar_timestamp_value::Value::TimeSecondValue(v)),
}),
PrimitiveScalarType::TimeSecond,
PrimitiveScalarType::TimestampSecond,
) => ScalarValue::TimestampSecond(Some(*v), unwrap_timezone(timezone)),
(
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone,
value:
Some(protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(v)),
}),
PrimitiveScalarType::TimeMillisecond,
PrimitiveScalarType::TimestampMillisecond,
) => ScalarValue::TimestampMillisecond(Some(*v), unwrap_timezone(timezone)),
(Value::Utf8Value(v), PrimitiveScalarType::Utf8) => {
ScalarValue::Utf8(Some(v.to_owned()))
Expand All @@ -1469,10 +1482,11 @@ fn typechecked_scalar_value_conversion(
PrimitiveScalarType::Utf8 => ScalarValue::Utf8(None),
PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None),
PrimitiveScalarType::Date32 => ScalarValue::Date32(None),
PrimitiveScalarType::TimeMicrosecond => {
PrimitiveScalarType::Time64 => ScalarValue::Time64(None),
PrimitiveScalarType::TimestampMicrosecond => {
ScalarValue::TimestampMicrosecond(None, None)
}
PrimitiveScalarType::TimeNanosecond => {
PrimitiveScalarType::TimestampNanosecond => {
ScalarValue::TimestampNanosecond(None, None)
}
PrimitiveScalarType::Null => {
Expand All @@ -1484,10 +1498,10 @@ fn typechecked_scalar_value_conversion(
ScalarValue::Decimal128(None, 0, 0)
}
PrimitiveScalarType::Date64 => ScalarValue::Date64(None),
PrimitiveScalarType::TimeSecond => {
PrimitiveScalarType::TimestampSecond => {
ScalarValue::TimestampSecond(None, None)
}
PrimitiveScalarType::TimeMillisecond => {
PrimitiveScalarType::TimestampMillisecond => {
ScalarValue::TimestampMillisecond(None, None)
}
PrimitiveScalarType::IntervalYearmonth => {
Expand All @@ -1496,6 +1510,8 @@ fn typechecked_scalar_value_conversion(
PrimitiveScalarType::IntervalDaytime => {
ScalarValue::IntervalDayTime(None)
}
PrimitiveScalarType::Binary => ScalarValue::Binary(None),
PrimitiveScalarType::LargeBinary => ScalarValue::LargeBinary(None),
};
scalar_value
} else {
Expand Down
8 changes: 8 additions & 0 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ mod roundtrip_tests {
ScalarValue::LargeUtf8(Some(String::from("Test Large utf8"))),
ScalarValue::Date32(Some(0)),
ScalarValue::Date32(Some(i32::MAX)),
ScalarValue::Date32(None),
ScalarValue::Time64(Some(0)),
ScalarValue::Time64(Some(i64::MAX)),
ScalarValue::Time64(None),
ScalarValue::TimestampNanosecond(Some(0), None),
ScalarValue::TimestampNanosecond(Some(i64::MAX), None),
ScalarValue::TimestampNanosecond(Some(0), Some("UTC".to_string())),
Expand Down Expand Up @@ -459,6 +463,10 @@ mod roundtrip_tests {
Box::new(DataType::Int32),
Box::new(ScalarValue::Utf8(None)),
),
ScalarValue::Binary(Some(b"bar".to_vec())),
ScalarValue::Binary(None),
ScalarValue::LargeBinary(Some(b"bar".to_vec())),
ScalarValue::LargeBinary(None),
];

for test_case in should_pass.into_iter() {
Expand Down
40 changes: 22 additions & 18 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::TimestampMicrosecond(val, tz) => {
create_proto_scalar(val, PrimitiveScalarType::TimeMicrosecond, |s| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these names were super confusing as the protobuf definition used Time and DataType and ScalarValue used Timestamp.

Making it more confusing is that ScalarValue::Time64 is not a timestamp (it is the time of day!)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think TimeMicrosecond stands for Timestamp with time unit as TimeUnit::MicroSecond so that it names TimeMicroSecond

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to TimestampMicrosecond LGTM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb do you know any history reason this field still named as TimeMilliSecond?
https://github.com/apache/arrow-datafusion/blob/master/datafusion/proto/proto/datafusion.proto#L669-L674

i think the original naming comes from here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb do you know any history reason this field still named as TimeMilliSecond?

I do not know why that field is called TimeMillisecond -- it is called Millisecond in the arrow schema so I think we could do the same in Datafusion: https://docs.rs/arrow/23.0.0/arrow/datatypes/enum.TimeUnit.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a PR to make the naming of TimeUnit consistent: #3575

create_proto_scalar(val, PrimitiveScalarType::TimestampMicrosecond, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand All @@ -1110,7 +1110,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::TimestampNanosecond(val, tz) => {
create_proto_scalar(val, PrimitiveScalarType::TimeNanosecond, |s| {
create_proto_scalar(val, PrimitiveScalarType::TimestampNanosecond, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand Down Expand Up @@ -1145,7 +1145,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::TimestampSecond(val, tz) => {
create_proto_scalar(val, PrimitiveScalarType::TimeSecond, |s| {
create_proto_scalar(val, PrimitiveScalarType::TimestampSecond, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand All @@ -1155,7 +1155,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
})
}
datafusion::scalar::ScalarValue::TimestampMillisecond(val, tz) => {
create_proto_scalar(val, PrimitiveScalarType::TimeMillisecond, |s| {
create_proto_scalar(val, PrimitiveScalarType::TimestampMillisecond, |s| {
Value::TimestampValue(protobuf::ScalarTimestampValue {
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
value: Some(
Expand All @@ -1180,19 +1180,21 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
value: Some(Value::NullValue(PrimitiveScalarType::Null as i32)),
},

datafusion::scalar::ScalarValue::Binary(_) => {
// not yet implemented (TODO file ticket)
return Err(Error::invalid_scalar_value(val));
scalar::ScalarValue::Binary(val) => {
create_proto_scalar(val, PrimitiveScalarType::Binary, |s| {
Value::BinaryValue(s.to_owned())
})
}

datafusion::scalar::ScalarValue::LargeBinary(_) => {
// not yet implemented (TODO file ticket)
return Err(Error::invalid_scalar_value(val));
scalar::ScalarValue::LargeBinary(val) => {
create_proto_scalar(val, PrimitiveScalarType::LargeBinary, |s| {
Value::LargeBinaryValue(s.to_owned())
})
}

datafusion::scalar::ScalarValue::Time64(_) => {
// not yet implemented (TODO file ticket)
return Err(Error::invalid_scalar_value(val));
datafusion::scalar::ScalarValue::Time64(v) => {
create_proto_scalar(v, PrimitiveScalarType::Time64, |v| {
Value::Time64Value(*v)
})
}

datafusion::scalar::ScalarValue::IntervalMonthDayNano(_) => {
Expand Down Expand Up @@ -1335,10 +1337,10 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
DataType::Date32 => Self::Scalar(PrimitiveScalarType::Date32 as i32),
DataType::Time64(time_unit) => match time_unit {
TimeUnit::Microsecond => {
Self::Scalar(PrimitiveScalarType::TimeMicrosecond as i32)
Self::Scalar(PrimitiveScalarType::TimestampMicrosecond as i32)
}
TimeUnit::Nanosecond => {
Self::Scalar(PrimitiveScalarType::TimeNanosecond as i32)
Self::Scalar(PrimitiveScalarType::TimestampNanosecond as i32)
}
_ => {
return Err(Error::invalid_time_unit(time_unit));
Expand Down Expand Up @@ -1379,8 +1381,10 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
DataType::Float64 => PrimitiveScalarType::Float64,
DataType::Date32 => PrimitiveScalarType::Date32,
DataType::Time64(time_unit) => match time_unit {
TimeUnit::Microsecond => PrimitiveScalarType::TimeMicrosecond,
TimeUnit::Nanosecond => PrimitiveScalarType::TimeNanosecond,
TimeUnit::Microsecond => {
PrimitiveScalarType::TimestampMicrosecond
}
TimeUnit::Nanosecond => PrimitiveScalarType::TimestampNanosecond,
_ => {
return Err(Error::invalid_time_unit(time_unit));
}
Expand Down