Skip to content

Commit

Permalink
Support partition value string deserialization for timestamp/binary (#…
Browse files Browse the repository at this point in the history
…371)

* Add partition value string deserialization for timestamp.

* Add partition value string deserialization for binary.

* Parse timestamp partition value to micro seconds.

* Bump up Arrow version.
  • Loading branch information
zijie0 authored Sep 15, 2021
1 parent 3232743 commit 58dcb1e
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 9 deletions.
72 changes: 68 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ env_logger = "0"
# for binary wheel best practice, statically link openssl
reqwest = { version = "*", features = ["native-tls-vendored"] }
serde_json = "1"
arrow = { git = "https://github.com/apache/arrow-rs", rev = "fa5acd971c973161f17e69d5c6b50d6e77c7da03" }
arrow = { git = "https://github.com/apache/arrow-rs", rev = "5c3ed6123d9ea0130a1eca95a0aae776b458208f" }
chrono = "0"

[dependencies.pyo3]
Expand All @@ -31,4 +31,3 @@ features = ["extension-module", "abi3", "abi3-py36"]
path = "../rust"
version = "0"
features = ["s3", "azure", "glue"]

4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ async-stream = { version = "0.3.2", default-features = true, optional = true }
# High-level writer
parquet-format = "~2.6.1"

arrow = { git = "https://github.com/apache/arrow-rs", rev = "fa5acd971c973161f17e69d5c6b50d6e77c7da03" }
parquet = { git = "https://github.com/apache/arrow-rs", rev = "fa5acd971c973161f17e69d5c6b50d6e77c7da03" }
arrow = { git = "https://github.com/apache/arrow-rs", rev = "5c3ed6123d9ea0130a1eca95a0aae776b458208f" }
parquet = { git = "https://github.com/apache/arrow-rs", rev = "5c3ed6123d9ea0130a1eca95a0aae776b458208f" }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "4ddd2f5e7582ffe662aea27bbb74c58cd0715152", optional = true }

crossbeam = { version = "0", optional = true }
Expand Down
38 changes: 37 additions & 1 deletion rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ fn typed_partition_value_from_string(
) -> Result<Value, CheckpointError> {
match data_type {
SchemaDataType::primitive(primitive_type) => match primitive_type.as_str() {
"string" => Ok(string_value.to_owned().into()),
"string" | "binary" => Ok(string_value.to_owned().into()),
"long" | "integer" | "short" | "byte" => Ok(string_value
.parse::<i64>()
.map_err(|_| CheckpointError::PartitionValueNotParseable(string_value.to_owned()))?
Expand All @@ -312,6 +312,14 @@ fn typed_partition_value_from_string(
// day 0 is 1970-01-01 (719163 days from ce)
Ok((d.num_days_from_ce() - 719_163).into())
}
"timestamp" => {
let ts =
chrono::naive::NaiveDateTime::parse_from_str(string_value, "%Y-%m-%d %H:%M:%S")
.map_err(|_| {
CheckpointError::PartitionValueNotParseable(string_value.to_owned())
})?;
Ok((ts.timestamp_millis() * 1000).into())
}
s => unimplemented!(
"Primitive type {} is not supported for partition column values.",
s
Expand Down Expand Up @@ -462,6 +470,34 @@ mod tests {
.unwrap()
);
}

for (s, v) in [
("2021-08-08 01:00:01", 1628384401000000i64),
("1970-01-02 12:59:59", 133199000000i64),
("1970-01-01 13:00:01", 46801000000i64),
("1969-12-31 00:00:00", -86400000000i64),
("1677-09-21 00:12:44", -9223372036000000i64),
] {
let timestamp_value: Value = v.into();
assert_eq!(
timestamp_value,
typed_partition_value_from_option_string(
&Some(s.to_string()),
&SchemaDataType::primitive("timestamp".to_string()),
)
.unwrap()
);
}

let binary_value: Value = "\u{2081}\u{2082}\u{2083}\u{2084}".into();
assert_eq!(
binary_value,
typed_partition_value_from_option_string(
&Some("₁₂₃₄".to_string()),
&SchemaDataType::primitive("binary".to_string()),
)
.unwrap()
);
}

#[test]
Expand Down

0 comments on commit 58dcb1e

Please sign in to comment.