Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamp handling to checkpoint writer #340

Merged
merged 2 commits into from
Aug 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 259 additions & 11 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl CheckPointWriter {
// Collect partition fields along with their data type from the current schema.
// JSON add actions contain a `partitionValues` field which is a map<string, string>.
// When loading `partitionValues_parsed` we have to convert the stringified partition values back to the correct data type.
let data_types: Vec<(&str, &SchemaDataType)> = current_metadata
let partition_col_data_types: Vec<(&str, &SchemaDataType)> = current_metadata
.schema
.get_fields()
.iter()
Expand All @@ -184,6 +184,10 @@ impl CheckPointWriter {
})
.collect();

// Collect a map of paths that require special stats conversion.
let mut stats_conversions: Vec<(SchemaPath, SchemaDataType)> = Vec::new();
collect_stats_conversions(&mut stats_conversions, current_metadata.schema.get_fields());

// protocol
let mut jsons = std::iter::once(action::Action::protocol(action::Protocol {
min_reader_version: state.min_reader_version(),
Expand Down Expand Up @@ -215,19 +219,16 @@ impl CheckPointWriter {
)
.map(|a| serde_json::to_value(a).map_err(ArrowError::from))
// adds
.chain(
state
.files()
.iter()
.map(|f| checkpoint_add_from_state(f, data_types.as_slice())),
);
.chain(state.files().iter().map(|f| {
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
}));

debug!("Preparing checkpoint parquet buffer.");
// Create the arrow schema that represents the Checkpoint parquet file.
let arrow_schema = delta_log_schema_for_table(
<ArrowSchema as TryFrom<&Schema>>::try_from(&current_metadata.schema)?,
current_metadata.partition_columns.as_slice(),
);

debug!("Writing to checkpoint parquet buffer...");
// Write the Checkpoint parquet file.
let writeable_cursor = InMemoryWriteableCursor::default();
Expand All @@ -250,14 +251,15 @@ impl CheckPointWriter {

fn checkpoint_add_from_state(
add: &action::Add,
data_types: &[(&str, &SchemaDataType)],
partition_col_data_types: &[(&str, &SchemaDataType)],
stats_conversions: &[(SchemaPath, SchemaDataType)],
) -> Result<Value, ArrowError> {
let mut v = serde_json::to_value(action::Action::add(add.clone()))?;

if !add.partition_values.is_empty() {
let mut partition_values_parsed: HashMap<String, Value> = HashMap::new();

for (field_name, data_type) in data_types.iter() {
for (field_name, data_type) in partition_col_data_types.iter() {
if let Some(string_value) = add.partition_values.get(*field_name) {
let v = typed_partition_value_from_string(string_value, data_type)?;

Expand All @@ -268,8 +270,24 @@ fn checkpoint_add_from_state(
let partition_values_parsed = serde_json::to_value(partition_values_parsed)?;
v["add"]["partitionValues_parsed"] = partition_values_parsed;
}

if let Ok(Some(stats)) = add.get_stats() {
let stats = serde_json::to_value(stats)?;
let mut stats = serde_json::to_value(stats)?;
let min_values = stats.get_mut("minValues").and_then(|v| v.as_object_mut());

if let Some(min_values) = min_values {
for (path, data_type) in stats_conversions {
apply_stats_conversion(min_values, path.as_slice(), data_type)
}
}

let max_values = stats.get_mut("maxValues").and_then(|v| v.as_object_mut());
if let Some(max_values) = max_values {
for (path, data_type) in stats_conversions {
apply_stats_conversion(max_values, path.as_slice(), data_type)
}
}

v["add"]["stats_parsed"] = stats;
}
Ok(v)
Expand Down Expand Up @@ -306,9 +324,79 @@ fn typed_partition_value_from_string(
}
}

type SchemaPath = Vec<String>;

fn collect_stats_conversions(
paths: &mut Vec<(SchemaPath, SchemaDataType)>,
fields: &[SchemaField],
) {
let mut _path = SchemaPath::new();
fields
.iter()
.for_each(|f| collect_field_conversion(&mut _path, paths, f));
}

fn collect_field_conversion(
current_path: &mut SchemaPath,
all_paths: &mut Vec<(SchemaPath, SchemaDataType)>,
field: &SchemaField,
) {
match field.get_type() {
SchemaDataType::primitive(type_name) => {
if let "timestamp" = type_name.as_str() {
let mut key_path = current_path.clone();
key_path.push(field.get_name().to_owned());
all_paths.push((key_path, field.get_type().to_owned()));
}
}
SchemaDataType::r#struct(struct_field) => {
let struct_fields = struct_field.get_fields();
current_path.push(field.get_name().to_owned());
struct_fields
.iter()
.for_each(|f| collect_field_conversion(current_path, all_paths, f));
current_path.pop();
}
_ => { /* noop */ }
}
}

fn apply_stats_conversion(
context: &mut serde_json::Map<String, Value>,
path: &[String],
data_type: &SchemaDataType,
) {
if path.len() == 1 {
match data_type {
SchemaDataType::primitive(type_name) if type_name == "timestamp" => {
let v = context.get_mut(&path[0]);

if let Some(v) = v {
let ts = v
.as_str()
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| serde_json::Number::from(dt.timestamp_nanos()))
.map(Value::Number);

if let Some(ts) = ts {
*v = ts;
}
}
}
_ => { /* noop */ }
}
} else {
let next_context = context.get_mut(&path[0]).and_then(|v| v.as_object_mut());
if let Some(next_context) = next_context {
apply_stats_conversion(next_context, &path[1..], data_type);
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use lazy_static::lazy_static;

#[test]
fn typed_partition_value_from_string_test() {
Expand Down Expand Up @@ -342,4 +430,164 @@ mod tests {
.unwrap()
);
}

#[test]
fn collect_stats_conversions_test() {
let delta_schema: Schema = serde_json::from_value(SCHEMA.clone()).unwrap();
let fields = delta_schema.get_fields();
let mut paths = Vec::new();

collect_stats_conversions(&mut paths, fields.as_slice());

assert_eq!(2, paths.len());
assert_eq!(
(
vec!["some_struct".to_string(), "struct_timestamp".to_string()],
SchemaDataType::primitive("timestamp".to_string())
),
paths[0]
);
assert_eq!(
(
vec!["some_timestamp".to_string()],
SchemaDataType::primitive("timestamp".to_string())
),
paths[1]
);
}

#[test]
fn apply_stats_conversion_test() {
let mut stats = STATS_JSON.clone();

let min_values = stats.get_mut("minValues").unwrap().as_object_mut().unwrap();

apply_stats_conversion(
min_values,
&["some_struct".to_string(), "struct_string".to_string()],
&SchemaDataType::primitive("string".to_string()),
);
apply_stats_conversion(
min_values,
&["some_struct".to_string(), "struct_timestamp".to_string()],
&SchemaDataType::primitive("timestamp".to_string()),
);
apply_stats_conversion(
min_values,
&["some_string".to_string()],
&SchemaDataType::primitive("string".to_string()),
);
apply_stats_conversion(
min_values,
&["some_timestamp".to_string()],
&SchemaDataType::primitive("timestamp".to_string()),
);

let max_values = stats.get_mut("maxValues").unwrap().as_object_mut().unwrap();

apply_stats_conversion(
max_values,
&["some_struct".to_string(), "struct_string".to_string()],
&SchemaDataType::primitive("string".to_string()),
);
apply_stats_conversion(
max_values,
&["some_struct".to_string(), "struct_timestamp".to_string()],
&SchemaDataType::primitive("timestamp".to_string()),
);
apply_stats_conversion(
max_values,
&["some_string".to_string()],
&SchemaDataType::primitive("string".to_string()),
);
apply_stats_conversion(
max_values,
&["some_timestamp".to_string()],
&SchemaDataType::primitive("timestamp".to_string()),
);

// minValues
assert_eq!(
"A",
stats["minValues"]["some_struct"]["struct_string"]
.as_str()
.unwrap()
);
assert_eq!(
1627668684594000000i64,
stats["minValues"]["some_struct"]["struct_timestamp"]
.as_i64()
.unwrap()
);
assert_eq!("P", stats["minValues"]["some_string"].as_str().unwrap());
assert_eq!(
1627668684594000000i64,
stats["minValues"]["some_timestamp"].as_i64().unwrap()
);

// maxValues
assert_eq!(
"B",
stats["maxValues"]["some_struct"]["struct_string"]
.as_str()
.unwrap()
);
assert_eq!(
1627668685594000000i64,
stats["maxValues"]["some_struct"]["struct_timestamp"]
.as_i64()
.unwrap()
);
assert_eq!("Q", stats["maxValues"]["some_string"].as_str().unwrap());
assert_eq!(
1627668685594000000i64,
stats["maxValues"]["some_timestamp"].as_i64().unwrap()
);
}

lazy_static! {
static ref SCHEMA: Value = json!({
"type": "struct",
"fields": [
{
"name": "some_struct",
"type": {
"type": "struct",
"fields": [
{
"name": "struct_string",
"type": "string",
"nullable": true, "metadata": {}
},
{
"name": "struct_timestamp",
"type": "timestamp",
"nullable": true, "metadata": {}
}]
},
"nullable": true, "metadata": {}
},
{ "name": "some_string", "type": "string", "nullable": true, "metadata": {} },
{ "name": "some_timestamp", "type": "timestamp", "nullable": true, "metadata": {} },
]
});
static ref STATS_JSON: Value = json!({
"minValues": {
"some_struct": {
"struct_string": "A",
"struct_timestamp": "2021-07-30T18:11:24.594Z"
},
"some_string": "P",
"some_timestamp": "2021-07-30T18:11:24.594Z"
},
"maxValues": {
"some_struct": {
"struct_string": "B",
"struct_timestamp": "2021-07-30T18:11:25.594Z"
},
"some_string": "Q",
"some_timestamp": "2021-07-30T18:11:25.594Z"
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"commitInfo":{"timestamp":1626123317718,"userId":"100011","userName":"christianw@scribd.com","operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[\"date\"]","properties":"{}"},"notebook":{"notebookId":"746418"},"clusterId":"0416-131918-rebus430","isolationLevel":"SnapshotIsolation","isBlindAppend":true,"operationMetrics":{}}}
{"commitInfo":{"timestamp":1627668675695,"userId":"100011","userName":"christianw@scribd.com","operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[\"date\"]","properties":"{}"},"notebook":{"notebookId":"746418"},"clusterId":"0416-131918-rebus430","isolationLevel":"SnapshotIsolation","isBlindAppend":true,"operationMetrics":{}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"d8e8815d-718e-4f65-a1ca-4c3c36f91cd2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"some_struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"some_struct_member\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{},"createdTime":1626123317318}}
{"metaData":{"id":"853536c9-0abe-4e66-9732-1718e542e6aa","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"some_struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"some_struct_member\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"some_struct_timestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ts\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{},"createdTime":1627668675432}}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"commitInfo":{"timestamp":1626123921334,"userId":"100011","userName":"christianw@scribd.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"746418"},"clusterId":"0416-131918-rebus430","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"1284","numOutputRows":"1"}}}
{"add":{"path":"date=2020-06-01/part-00000-1dc7fa4a-28b4-44cb-af1a-481ac26638be.c000.snappy.parquet","partitionValues":{"date":"2020-06-01"},"size":1284,"modificationTime":1626123921000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"some_struct\":{\"some_struct_member\":\"x\"},\"value\":\"x\"},\"maxValues\":{\"some_struct\":{\"some_struct_member\":\"x\"},\"value\":\"x\"},\"nullCount\":{\"some_struct\":{\"some_struct_member\":0},\"some_map\":0,\"value\":0}}"}}
{"commitInfo":{"timestamp":1627668685528,"userId":"100011","userName":"christianw@scribd.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"746418"},"clusterId":"0416-131918-rebus430","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"1502","numOutputRows":"1"}}}
{"add":{"path":"date=2020-06-01/part-00000-b207ef5f-4458-4969-bd34-46439cdeb6a6.c000.snappy.parquet","partitionValues":{"date":"2020-06-01"},"size":1502,"modificationTime":1627668686000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"some_struct\":{\"some_struct_member\":\"x\",\"some_struct_timestamp\":\"2021-07-30T18:11:24.594Z\"},\"value\":\"x\",\"ts\":\"2021-07-30T18:11:24.594Z\"},\"maxValues\":{\"some_struct\":{\"some_struct_member\":\"x\",\"some_struct_timestamp\":\"2021-07-30T18:11:24.594Z\"},\"value\":\"x\",\"ts\":\"2021-07-30T18:11:24.594Z\"},\"nullCount\":{\"some_struct\":{\"some_struct_member\":0,\"some_struct_timestamp\":0},\"value\":0,\"ts\":0}}"}}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"commitInfo":{"timestamp":1626123925310,"userId":"100011","userName":"christianw@scribd.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"746418"},"clusterId":"0416-131918-rebus430","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"1284","numOutputRows":"1"}}}
{"add":{"path":"date=2020-06-01/part-00000-50e5eedb-d665-4289-8838-1cd52212b0d1.c000.snappy.parquet","partitionValues":{"date":"2020-06-01"},"size":1284,"modificationTime":1626123926000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"some_struct\":{\"some_struct_member\":\"x\"},\"value\":\"x\"},\"maxValues\":{\"some_struct\":{\"some_struct_member\":\"x\"},\"value\":\"x\"},\"nullCount\":{\"some_struct\":{\"some_struct_member\":0},\"some_map\":0,\"value\":0}}"}}
{"commitInfo":{"timestamp":1627668687609,"userId":"100011","userName":"christianw@scribd.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"746418"},"clusterId":"0416-131918-rebus430","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"1502","numOutputRows":"1"}}}
{"add":{"path":"date=2020-06-01/part-00000-762e2b03-6a04-4707-b676-5d38d1ef9fca.c000.snappy.parquet","partitionValues":{"date":"2020-06-01"},"size":1502,"modificationTime":1627668688000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"some_struct\":{\"some_struct_member\":\"x\",\"some_struct_timestamp\":\"2021-07-30T18:11:27.001Z\"},\"value\":\"x\",\"ts\":\"2021-07-30T18:11:27.001Z\"},\"maxValues\":{\"some_struct\":{\"some_struct_member\":\"x\",\"some_struct_timestamp\":\"2021-07-30T18:11:27.001Z\"},\"value\":\"x\",\"ts\":\"2021-07-30T18:11:27.001Z\"},\"nullCount\":{\"some_struct\":{\"some_struct_member\":0,\"some_struct_timestamp\":0},\"value\":0,\"ts\":0}}"}}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"commitInfo":{"timestamp":1626123928572,"userId":"100011","userName":"christianw@scribd.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"746418"},"clusterId":"0416-131918-rebus430","readVersion":2,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"1284","numOutputRows":"1"}}}
{"add":{"path":"date=2020-06-01/part-00000-e2ff04ef-a19f-4ebe-84a6-af74babc35d1.c000.snappy.parquet","partitionValues":{"date":"2020-06-01"},"size":1284,"modificationTime":1626123929000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"some_struct\":{\"some_struct_member\":\"x\"},\"value\":\"x\"},\"maxValues\":{\"some_struct\":{\"some_struct_member\":\"x\"},\"value\":\"x\"},\"nullCount\":{\"some_struct\":{\"some_struct_member\":0},\"some_map\":0,\"value\":0}}"}}
{"commitInfo":{"timestamp":1627668691354,"userId":"100011","userName":"christianw@scribd.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"746418"},"clusterId":"0416-131918-rebus430","readVersion":2,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"1502","numOutputRows":"1"}}}
{"add":{"path":"date=2020-06-01/part-00000-ca044815-1585-4e08-aa15-904c5ec961e6.c000.snappy.parquet","partitionValues":{"date":"2020-06-01"},"size":1502,"modificationTime":1627668691000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"some_struct\":{\"some_struct_member\":\"x\",\"some_struct_timestamp\":\"2021-07-30T18:11:29.919Z\"},\"value\":\"x\",\"ts\":\"2021-07-30T18:11:29.919Z\"},\"maxValues\":{\"some_struct\":{\"some_struct_member\":\"x\",\"some_struct_timestamp\":\"2021-07-30T18:11:29.919Z\"},\"value\":\"x\",\"ts\":\"2021-07-30T18:11:29.919Z\"},\"nullCount\":{\"some_struct\":{\"some_struct_member\":0,\"some_struct_timestamp\":0},\"value\":0,\"ts\":0}}"}}
Loading