Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: replace deprecated arrow::json::reader::Decoder #1226

Merged
merged 8 commits into from
May 12, 2023
4 changes: 2 additions & 2 deletions python/tests/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ def test_checkpoint(tmp_path: pathlib.Path, sample_data: pa.Table):
tmp_table_path / "_delta_log" / "00000000000000000000.checkpoint.parquet"
)

# TODO: Include decimal after fixing issue "Json error: Decimal128(5, 3) type is not supported"
sample_data = sample_data.drop(["decimal"])
# TODO: Include binary after fixing issue "Json error: binary type is not supported"
sample_data = sample_data.drop(["binary"])
write_deltalake(str(tmp_table_path), sample_data)

assert not checkpoint_path.exists()
Expand Down
8 changes: 5 additions & 3 deletions python/tests/test_schema.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json

import pyarrow
import pytest

Expand All @@ -9,7 +11,7 @@ def test_table_schema():
table_path = "../rust/tests/data/simple_table"
dt = DeltaTable(table_path)
schema = dt.schema()
assert schema.json() == {
assert json.loads(schema.to_json()) == {
Copy link
Collaborator

@wjones127 wjones127 May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. This seems like an API-breaking change? What happened? nevermind I can't read. Looks good 👍

"fields": [{"metadata": {}, "name": "id", "nullable": True, "type": "long"}],
"type": "struct",
}
Expand All @@ -20,8 +22,8 @@ def test_table_schema():
assert field.nullable is True
assert field.metadata == {}

json = '{"type":"struct","fields":[{"name":"x","type":{"type":"array","elementType":"long","containsNull":true},"nullable":true,"metadata":{}}]}'
schema = Schema.from_json(json)
json_buf = '{"type":"struct","fields":[{"name":"x","type":{"type":"array","elementType":"long","containsNull":true},"nullable":true,"metadata":{}}]}'
schema = Schema.from_json(json_buf)
assert schema.fields[0] == Field(
"x", ArrayType(PrimitiveType("long"), True), True, {}
)
Expand Down
22 changes: 11 additions & 11 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ readme = "README.md"
edition = "2021"

[dependencies]
arrow = { version = "37", optional = true }
arrow-array = { version = "37", optional = true }
arrow-cast = { version = "37", optional = true }
arrow-schema = { version = "37", optional = true }
arrow = { version = "38", optional = true }
arrow-array = { version = "38", optional = true }
arrow-cast = { version = "38", optional = true }
arrow-schema = { version = "38", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
Expand All @@ -36,7 +36,7 @@ num-traits = "0.2.15"
object_store = "0.5.6"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "37", features = [
parquet = { version = "38", features = [
"async",
"object_store",
], optional = true }
Expand Down Expand Up @@ -65,12 +65,12 @@ reqwest-middleware = { version = "0.2.1", optional = true }
reqwest-retry = { version = "0.2.2", optional = true }

# Datafusion
datafusion = { version = "23", optional = true }
datafusion-expr = { version = "23", optional = true }
datafusion-common = { version = "23", optional = true }
datafusion-proto = { version = "23", optional = true }
datafusion-sql = { version = "23", optional = true }
datafusion-physical-expr = { version = "23", optional = true }
datafusion = { version = "24", optional = true }
datafusion-expr = { version = "24", optional = true }
datafusion-common = { version = "24", optional = true }
datafusion-proto = { version = "24", optional = true }
datafusion-sql = { version = "24", optional = true }
datafusion-physical-expr = { version = "24", optional = true }

sqlparser = { version = "0.33", optional = true }

Expand Down
27 changes: 16 additions & 11 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
//! Implementation for writing delta checkpoints.

use arrow::datatypes::Schema as ArrowSchema;
// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
use arrow::error::ArrowError;
#[allow(deprecated)]
use arrow::json::reader::{Decoder, DecoderOptions};
use arrow::json::ReaderBuilder;
use chrono::{DateTime, Datelike, Duration, Utc};
use futures::StreamExt;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -75,6 +72,13 @@ pub enum CheckpointError {
#[from]
source: serde_json::Error,
},
/// Passthrough error returned when doing std::io operations
#[error("std::io::Error: {source}")]
Io {
/// The source std::io::Error
#[from]
source: std::io::Error,
},
}

/// The record batch size for checkpoint parquet file
Expand Down Expand Up @@ -302,9 +306,6 @@ pub async fn cleanup_expired_logs_for(
}
}

// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
#[allow(deprecated)]
fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, CheckpointError> {
let current_metadata = state
.current_metadata()
Expand Down Expand Up @@ -338,7 +339,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
}

// protocol
let mut jsons = std::iter::once(action::Action::protocol(action::Protocol {
let jsons = std::iter::once(action::Action::protocol(action::Protocol {
min_reader_version: state.min_reader_version(),
min_writer_version: state.min_writer_version(),
}))
Expand Down Expand Up @@ -388,12 +389,16 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
// Write the Checkpoint parquet file.
let mut bytes = vec![];
let mut writer = ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), None)?;
let mut decoder = ReaderBuilder::new(arrow_schema)
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
.build_decoder()?;
let jsons: Vec<serde_json::Value> = jsons.map(|r| r.unwrap()).collect();
decoder.serialize(&jsons)?;

let options = DecoderOptions::new().with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE);
let decoder = Decoder::new(arrow_schema, options);
while let Some(batch) = decoder.next_batch(&mut jsons)? {
while let Some(batch) = decoder.flush()? {
writer.write(&batch)?;
}

let _ = writer.close()?;
debug!("Finished writing checkpoint parquet buffer.");

Expand Down
2 changes: 1 addition & 1 deletion rust/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ mod tests {
let data = serde_json::json!(
{
"id" : "A",
"value": "test",
"value": 42,
"modified": "2021-02-01"
}
);
Expand Down
76 changes: 76 additions & 0 deletions rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
}

/// Helper container for partitioned record batches
#[derive(Clone, Debug)]
pub struct PartitionResult {
/// values found in partition columns
pub partition_values: HashMap<String, Option<String>>,
Expand Down Expand Up @@ -416,6 +417,7 @@ mod tests {
test_utils::{create_initialized_table, get_record_batch},
utils::PartitionPath,
};
use arrow::json::ReaderBuilder;
use std::path::Path;

#[tokio::test]
Expand Down Expand Up @@ -447,6 +449,80 @@ mod tests {
validate_partition_map(partitions, &partition_cols, expected_keys)
}

/*
* This test is a little messy but demonstrates a bug when
* trying to write data to a Delta Table that has a map column and partition columns
*
* For readability the schema and data for the write are defined in JSON
*/
#[tokio::test]
async fn test_divide_record_batch_with_map_single_partition() {
use crate::{DeltaOps, SchemaTypeStruct};

let table = crate::writer::test_utils::create_bare_table();
let partition_cols = vec!["modified".to_string()];
let delta_schema = r#"
{"type" : "struct",
"fields" : [
{"name" : "id", "type" : "string", "nullable" : false, "metadata" : {}},
{"name" : "value", "type" : "integer", "nullable" : false, "metadata" : {}},
{"name" : "modified", "type" : "string", "nullable" : false, "metadata" : {}},
{"name" : "metadata", "type" :
{"type" : "map", "keyType" : "string", "valueType" : "string", "valueContainsNull" : true},
"nullable" : false, "metadata" : {}}
]
}"#;

let delta_schema: SchemaTypeStruct =
serde_json::from_str(delta_schema).expect("Failed to parse schema");

let table = DeltaOps(table)
.create()
.with_partition_columns(partition_cols.to_vec())
.with_columns(delta_schema.get_fields().clone())
.await
.unwrap();

let buf = r#"
{"id" : "0xdeadbeef", "value" : 42, "modified" : "2021-02-01",
"metadata" : {"some-key" : "some-value"}}
{"id" : "0xdeadcaf", "value" : 3, "modified" : "2021-02-02",
"metadata" : {"some-key" : "some-value"}}"#
.as_bytes();

let schema: ArrowSchema =
<ArrowSchema as TryFrom<&Schema>>::try_from(&delta_schema).unwrap();

// Using a batch size of two since the buf above only has two records
let mut decoder = ReaderBuilder::new(Arc::new(schema))
.with_batch_size(2)
.build_decoder()
.expect("Failed to build decoder");

decoder
.decode(buf)
.expect("Failed to deserialize the JSON in the buffer");
let batch = decoder.flush().expect("Failed to flush").unwrap();

let mut writer = RecordBatchWriter::for_table(&table).unwrap();
let partitions = writer.divide_by_partition_values(&batch).unwrap();
println!("partitions: {:?}", partitions);

let expected_keys = vec![
String::from("modified=2021-02-01"),
String::from("modified=2021-02-02"),
];

assert_eq!(partitions.len(), expected_keys.len());
for result in partitions {
let partition_key =
PartitionPath::from_hashmap(&partition_cols, &result.partition_values)
.unwrap()
.into();
assert!(expected_keys.contains(&partition_key));
}
}

#[tokio::test]
async fn test_divide_record_batch_multiple_partitions() {
let batch = get_record_batch(None, false);
Expand Down
21 changes: 7 additions & 14 deletions rust/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::io::Write;
use std::sync::Arc;

use crate::writer::DeltaWriterError;
use crate::DeltaTableError;
use crate::DeltaResult;

use arrow::array::{
as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, Array,
Expand All @@ -16,10 +16,7 @@ use arrow::datatypes::{
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type,
UInt64Type, UInt8Type,
};
// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
#[allow(deprecated)]
use arrow::json::reader::{Decoder, DecoderOptions};
use arrow::json::ReaderBuilder;
use arrow::record_batch::*;
use object_store::path::Path;
use parking_lot::RwLock;
Expand Down Expand Up @@ -107,19 +104,15 @@ pub(crate) fn next_data_path(
Ok(Path::from(format!("{partition_key}/{file_name}")))
}

// NOTE: Temporarily allowing these deprecated imports pending the completion of:
// <https://github.com/apache/arrow-rs/pull/3979>
#[allow(deprecated)]
/// Convert a vector of json values to a RecordBatch
pub fn record_batch_from_message(
arrow_schema: Arc<ArrowSchema>,
message_buffer: &[Value],
) -> Result<RecordBatch, DeltaTableError> {
let mut value_iter = message_buffer.iter().map(|j| Ok(j.to_owned()));
let options = DecoderOptions::new().with_batch_size(message_buffer.len());
let decoder = Decoder::new(arrow_schema, options);
json: &[Value],
) -> DeltaResult<RecordBatch> {
let mut decoder = ReaderBuilder::new(arrow_schema).build_decoder().unwrap();
decoder.serialize(json)?;
decoder
.next_batch(&mut value_iter)?
.flush()?
.ok_or_else(|| DeltaWriterError::EmptyRecordBatch.into())
}

Expand Down