Skip to content

Commit

Permalink
fix: serde issues and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Oct 24, 2023
1 parent 53f1c3d commit 22325df
Show file tree
Hide file tree
Showing 29 changed files with 531 additions and 139 deletions.
28 changes: 13 additions & 15 deletions rust/examples/basic_operations.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,45 @@
use arrow::{
array::{Int32Array, StringArray, TimestampMicrosecondArray},
datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit},
datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema, TimeUnit},
record_batch::RecordBatch,
};
use deltalake::kernel::{DataType, PrimitiveType, StructField};
use deltalake::operations::collect_sendable_stream;
use deltalake::{protocol::SaveMode, DeltaOps, SchemaDataType, SchemaField};
use deltalake::{protocol::SaveMode, DeltaOps};
use parquet::{
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
};

use std::sync::Arc;

fn get_table_columns() -> Vec<SchemaField> {
fn get_table_columns() -> Vec<StructField> {
vec![
SchemaField::new(
StructField::new(
String::from("int"),
SchemaDataType::primitive(String::from("integer")),
DataType::Primitive(PrimitiveType::Integer),
false,
Default::default(),
),
SchemaField::new(
StructField::new(
String::from("string"),
SchemaDataType::primitive(String::from("string")),
DataType::Primitive(PrimitiveType::String),
true,
Default::default(),
),
SchemaField::new(
StructField::new(
String::from("timestamp"),
SchemaDataType::primitive(String::from("timestamp")),
DataType::Primitive(PrimitiveType::Timestamp),
true,
Default::default(),
),
]
}

fn get_table_batches() -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("int", DataType::Int32, false),
Field::new("string", DataType::Utf8, true),
Field::new("int", ArrowDataType::Int32, false),
Field::new("string", ArrowDataType::Utf8, true),
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Microsecond, None),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
]));
Expand Down
27 changes: 11 additions & 16 deletions rust/examples/recordbatch-writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use chrono::prelude::*;
use deltalake::arrow::array::*;
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::errors::DeltaTableError;
use deltalake::kernel::{DataType, PrimitiveType, StructField, StructType};
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::*;
use log::*;
Expand All @@ -19,8 +20,6 @@ use parquet::{
basic::{Compression, ZstdLevel},
file::properties::WriterProperties,
};

use std::collections::HashMap;
use std::sync::Arc;

/*
Expand Down Expand Up @@ -86,31 +85,27 @@ struct WeatherRecord {
}

impl WeatherRecord {
fn columns() -> Vec<SchemaField> {
fn columns() -> Vec<StructField> {
vec![
SchemaField::new(
StructField::new(
"timestamp".to_string(),
SchemaDataType::primitive("timestamp".to_string()),
DataType::Primitive(PrimitiveType::Timestamp),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"temp".to_string(),
SchemaDataType::primitive("integer".to_string()),
DataType::Primitive(PrimitiveType::Integer),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"lat".to_string(),
SchemaDataType::primitive("double".to_string()),
DataType::Primitive(PrimitiveType::Float),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"long".to_string(),
SchemaDataType::primitive("double".to_string()),
DataType::Primitive(PrimitiveType::Float),
true,
HashMap::new(),
),
]
}
Expand Down Expand Up @@ -167,7 +162,7 @@ fn convert_to_batch(table: &DeltaTable, records: &Vec<WeatherRecord>) -> RecordB
let metadata = table
.get_metadata()
.expect("Failed to get metadata for the table");
let arrow_schema = <deltalake::arrow::datatypes::Schema as TryFrom<&Schema>>::try_from(
let arrow_schema = <deltalake::arrow::datatypes::Schema as TryFrom<&StructType>>::try_from(
&metadata.schema.clone(),
)
.expect("Failed to convert to arrow schema");
Expand Down
1 change: 1 addition & 0 deletions rust/src/kernel/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub enum ActionType {
#[serde(rename_all = "camelCase")]
#[allow(missing_docs)]
pub enum Action {
#[serde(rename = "metaData")]
Metadata(Metadata),
Protocol(Protocol),
Add(Add),
Expand Down
5 changes: 4 additions & 1 deletion rust/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::str::FromStr;
// use std::sync::Arc;

// use roaring::RoaringTreemap;
use log::warn;
use serde::{Deserialize, Serialize};
use tracing::warn;
use url::Url;

use super::super::schema::StructType;
Expand Down Expand Up @@ -105,6 +105,7 @@ impl Metadata {
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
/// Defines a protocol action
pub struct Protocol {
/// The minimum version of the Delta read protocol that a client must implement
Expand Down Expand Up @@ -203,6 +204,7 @@ impl ToString for StorageType {
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
/// Defines a deletion vector
pub struct DeletionVectorDescriptor {
/// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p'].
Expand Down Expand Up @@ -336,6 +338,7 @@ impl DeletionVectorDescriptor {
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
/// Defines an add action
pub struct Add {
/// A relative path to a data file from the root of the table or an absolute path to a file
Expand Down
12 changes: 8 additions & 4 deletions rust/tests/checkpoint_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ mod checkpoints_with_tombstones {
use super::*;
use ::object_store::path::Path as ObjectStorePath;
use chrono::Utc;
use deltalake::protocol::*;
use deltalake::kernel::*;
use deltalake::table::config::DeltaConfigKey;
use deltalake::*;
use maplit::hashmap;
Expand Down Expand Up @@ -346,6 +346,8 @@ mod checkpoints_with_tombstones {
size: None,
tags: None,
deletion_vector: None,
base_row_id: None,
default_row_commit_version: None,
})
.collect();

Expand All @@ -357,8 +359,8 @@ mod checkpoints_with_tombstones {
let actions = removes
.iter()
.cloned()
.map(Action::remove)
.chain(std::iter::once(Action::add(add.clone())))
.map(Action::Remove)
.chain(std::iter::once(Action::Add(add.clone())))
.collect();
let operation = DeltaOperation::Optimize {
predicate: None,
Expand Down Expand Up @@ -389,7 +391,7 @@ mod checkpoints_with_tombstones {
let actions = actions
.iter()
.filter_map(|a| match a {
Action::remove(r) => Some(r.clone()),
Action::Remove(r) => Some(r.clone()),
_ => None,
})
.collect();
Expand All @@ -408,6 +410,8 @@ mod checkpoints_with_tombstones {
size: Some(100),
tags: None,
deletion_vector: None,
base_row_id: None,
default_row_commit_version: None,
}
}

Expand Down
38 changes: 19 additions & 19 deletions rust/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, error::Error, sync::Arc};

use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
use arrow_select::concat::concat_batches;
use deltalake::errors::DeltaTableError;
use deltalake::kernel::{Action, DataType, PrimitiveType, Remove, StructField};
use deltalake::operations::optimize::{create_merge_plan, MetricDetails, Metrics, OptimizeType};
use deltalake::operations::transaction::commit;
use deltalake::operations::DeltaOps;
use deltalake::protocol::{Action, DeltaOperation, Remove};
use deltalake::protocol::DeltaOperation;
use deltalake::storage::ObjectStoreRef;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::{DeltaTable, PartitionFilter, Path, SchemaDataType, SchemaField};
use deltalake::{DeltaTable, PartitionFilter, Path};
use futures::TryStreamExt;
use object_store::ObjectStore;
use parquet::arrow::async_reader::ParquetObjectReader;
Expand All @@ -30,23 +31,20 @@ struct Context {

async fn setup_test(partitioned: bool) -> Result<Context, Box<dyn Error>> {
let columns = vec![
SchemaField::new(
StructField::new(
"x".to_owned(),
SchemaDataType::primitive("integer".to_owned()),
DataType::Primitive(PrimitiveType::Integer),
false,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"y".to_owned(),
SchemaDataType::primitive("integer".to_owned()),
DataType::Primitive(PrimitiveType::Integer),
false,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"date".to_owned(),
SchemaDataType::primitive("string".to_owned()),
DataType::Primitive(PrimitiveType::String),
false,
HashMap::new(),
),
];

Expand Down Expand Up @@ -90,9 +88,9 @@ fn generate_random_batch<T: Into<String>>(

Ok(RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
Field::new("x", DataType::Int32, false),
Field::new("y", DataType::Int32, false),
Field::new("date", DataType::Utf8, false),
Field::new("x", ArrowDataType::Int32, false),
Field::new("y", ArrowDataType::Int32, false),
Field::new("date", ArrowDataType::Utf8, false),
])),
vec![Arc::new(x_array), Arc::new(y_array), Arc::new(date_array)],
)?)
Expand All @@ -119,9 +117,9 @@ fn tuples_to_batch<T: Into<String>>(

Ok(RecordBatch::try_new(
Arc::new(ArrowSchema::new(vec![
Field::new("x", DataType::Int32, false),
Field::new("y", DataType::Int32, false),
Field::new("date", DataType::Utf8, false),
Field::new("x", ArrowDataType::Int32, false),
Field::new("y", ArrowDataType::Int32, false),
Field::new("date", ArrowDataType::Utf8, false),
])),
vec![Arc::new(x_array), Arc::new(y_array), Arc::new(date_array)],
)?)
Expand Down Expand Up @@ -292,12 +290,14 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box<dyn Error>> {
partition_values: Some(add.partition_values.clone()),
tags: Some(HashMap::new()),
deletion_vector: add.deletion_vector.clone(),
base_row_id: add.base_row_id,
default_row_commit_version: add.default_row_commit_version,
};

let operation = DeltaOperation::Delete { predicate: None };
commit(
other_dt.object_store().as_ref(),
&vec![Action::remove(remove)],
&vec![Action::Remove(remove)],
operation,
&other_dt.state,
None,
Expand Down
20 changes: 9 additions & 11 deletions rust/tests/command_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

use arrow::datatypes::Schema as ArrowSchema;
use arrow_array::{Int32Array, RecordBatch};
use arrow_schema::{DataType, Field};
use arrow_schema::{DataType as ArrowDataType, Field};
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use deltalake::kernel::{DataType, PrimitiveType, StructField};
use deltalake::protocol::SaveMode;
use deltalake::{DeltaOps, DeltaTable, SchemaDataType, SchemaField};
use deltalake::{DeltaOps, DeltaTable};
use rand::Rng;
use std::collections::HashMap;
use std::error::Error;
use std::fs;
use std::sync::Arc;
Expand All @@ -21,17 +21,15 @@ struct Context {

async fn setup_test() -> Result<Context, Box<dyn Error>> {
let columns = vec![
SchemaField::new(
StructField::new(
"id".to_string(),
SchemaDataType::primitive("integer".to_string()),
DataType::Primitive(PrimitiveType::Integer),
true,
HashMap::new(),
),
SchemaField::new(
StructField::new(
"value".to_string(),
SchemaDataType::primitive("integer".to_string()),
DataType::Primitive(PrimitiveType::Integer),
true,
HashMap::new(),
),
];

Expand Down Expand Up @@ -77,8 +75,8 @@ fn get_record_batch() -> RecordBatch {
}

let schema = ArrowSchema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("value", DataType::Int32, true),
Field::new("id", ArrowDataType::Int32, true),
Field::new("value", ArrowDataType::Int32, true),
]);

let id_array = Int32Array::from(id_vec);
Expand Down
8 changes: 4 additions & 4 deletions rust/tests/command_vacuum.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use chrono::Duration;
use common::clock::TestClock;
use common::TestContext;
use deltalake::kernel::StructType;
use deltalake::operations::vacuum::Clock;
use deltalake::operations::DeltaOps;
use deltalake::Schema;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use serde_json::json;
use std::sync::Arc;

mod common;

/// Basic schema
pub fn get_xy_date_schema() -> Schema {
pub fn get_xy_date_schema() -> StructType {
serde_json::from_value(json!({
"type": "struct",
"fields": [
Expand All @@ -24,8 +24,8 @@ pub fn get_xy_date_schema() -> Schema {
}

/// Schema that contains a column prefiexed with _
pub fn get_vacuum_underscore_schema() -> Schema {
serde_json::from_value::<Schema>(json!({
pub fn get_vacuum_underscore_schema() -> StructType {
serde_json::from_value::<StructType>(json!({
"type": "struct",
"fields": [
{"name": "x", "type": "integer", "nullable": false, "metadata": {}},
Expand Down
Loading

0 comments on commit 22325df

Please sign in to comment.