Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write support for additional Arrow datatypes #1044

Merged
merged 92 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
caffbde
Support for LargeUtf8 and LargeBinary
chitralverma Dec 30, 2022
6109d4b
fix formatting
chitralverma Dec 30, 2022
94423ba
feat: enable passing storage options to Delta table builder via DataF…
gruuya Dec 30, 2022
246bfc7
feat: make `DeltaStorageHandler` pickle serializable (#1016)
roeap Dec 30, 2022
4204ace
feat: clean up dependencies and feature flags (#1014)
roeap Dec 30, 2022
6e035a2
chore: update github actions to latest versions (#1046)
roeap Jan 1, 2023
c7fae50
bump version for dynamodb_lock crate (#1047)
houqp Jan 2, 2023
68e5ebf
refactor: move vacuum command to operations module (#1045)
roeap Jan 5, 2023
3973bbd
build(deps): bump tokio from 1.23.0 to 1.23.1 in /delta-inspect
dependabot[bot] Jan 6, 2023
de6af9d
Expose checkpoint creation for current table state in python (#1058)
ismoshkov Jan 11, 2023
f52f58a
feat: expose function to get table of add actions (#1033)
wjones127 Jan 11, 2023
99703ab
feat: harmonize and simplify storage configuration (#1052)
roeap Jan 11, 2023
205f04b
feat: improve storage location handling (#1065)
roeap Jan 15, 2023
abab3d4
fix: datafusion predicate pushdown and dependencies (#1071)
roeap Jan 17, 2023
a262444
Save operational params in the same way with delta io (#1054)
ismoshkov Jan 17, 2023
34e6cbf
Fix typo in delta-inspect (#1072)
jonahgao Jan 17, 2023
e66f618
test(python): add read / write benchmarks (#933)
wjones127 Jan 17, 2023
a15ce09
Add missing documentation metadata to Cargo.toml (#1077)
johnbatty Jan 18, 2023
22b8ad2
ci: merge several labels
iajoiner Jan 19, 2023
f644ed8
build(deps): bump bumpalo from 3.10.0 to 3.12.0 in /aws/delta-checkpo…
dependabot[bot] Jan 21, 2023
29388f5
build(deps): update serial_test requirement from 0 to 1 (#1088)
dependabot[bot] Jan 23, 2023
bdc6cd8
Add test for: to_scalar_value (#1086)
marijncv Jan 24, 2023
6ca7c7b
test: add Data Acceptance Tests (#909)
wjones127 Jan 25, 2023
cc6e781
Bump deltalake-python version to 0.7.0 (#1098)
wjones127 Jan 26, 2023
e851d47
chore: remove unmaintained ruby bindings (#1102)
roeap Jan 26, 2023
df5175a
chore: clippy (#1106)
roeap Jan 27, 2023
bee698e
Delete lasted Ruby references (#1107)
ismoshkov Jan 27, 2023
7a97227
add test for left_larger_than_right (#1110)
marijncv Jan 28, 2023
1984742
Add an example of writing to a delta table with a RecordBatch
rtyler Jan 21, 2023
09b744a
Update rust/examples/recordbatch-writer.rs
rtyler Jan 22, 2023
2b776da
Update rust/examples/recordbatch-writer.rs
rtyler Jan 22, 2023
2fe9510
fix syntax errors
wjones127 Jan 28, 2023
e07943a
format and put behind feature gate
wjones127 Jan 28, 2023
19f607a
fix: change unexpected field logging level to debug (#1112)
houqp Jan 29, 2023
4afdaef
chore: update datafusion (#1114)
roeap Feb 1, 2023
3d25da0
Implement filesystem check (#1103)
Blajda Feb 3, 2023
3c440d3
build(deps): bump tokio from 1.23.1 to 1.24.2 in /delta-inspect (#1118)
dependabot[bot] Feb 4, 2023
3c797a0
minor: optimize partition lookup for vacuum loop (#1120)
houqp Feb 5, 2023
8d6c55c
add test for get_boolean_from_metadata (#1121)
marijncv Feb 5, 2023
d513f4d
Move roadmap to a pinned issue (#1129)
wjones127 Feb 5, 2023
7bc4031
add test for min_max_schema_for_fields (#1122)
marijncv Feb 6, 2023
11f0dc0
improve debuggability of json ser/de errors (#1119)
houqp Feb 11, 2023
a3bef3c
Set AddAction timestamps to milliseconds. Fixes #1124 (#1133)
guyrt Feb 11, 2023
2e75e22
add test for null_count_schema_for_fields (#1135)
marijncv Feb 11, 2023
a20e154
Make rustls default across all packages (#1097)
wjones127 Feb 11, 2023
de12e5d
build(deps): bump openssl-src from 111.22.0+1.1.1q to 111.25.0+1.1.1t…
dependabot[bot] Feb 12, 2023
ce4b72a
chore(rust): update rust changelog for 0.7.0 (#1137)
wjones127 Feb 12, 2023
200487c
chore: remove star dependencies (#1139)
wjones127 Feb 12, 2023
5f0b17e
add function & test for parsing table_or_uri (#1138)
marijncv Feb 12, 2023
02bcec8
build(deps): update errno requirement from 0.2 to 0.3 (#1142)
dependabot[bot] Feb 13, 2023
744d147
use Path object in writer tests (#1147)
marijncv Feb 14, 2023
e8816db
fix typo (#1166)
spebern Feb 20, 2023
e675fea
add test for extract_partition_values (#1159)
marijncv Feb 22, 2023
d73d94b
fix: avoid some allocations in DeltaStorageHandler (#1115)
roeap Feb 22, 2023
53f1e79
first setup of ruff for python linting (#1158)
marijncv Feb 22, 2023
fc34678
feat: move and update Optimize operation (#1154)
roeap Feb 23, 2023
8bd502a
Selectively overwrite data with python (#1101)
ismoshkov Feb 25, 2023
5bbeafe
fix: update out-of-date doc about datafusion (#1183)
xudong963 Feb 28, 2023
0738124
Enable passing Datafusion session state to WriteBuilder (#1187)
gruuya Mar 1, 2023
debbd75
chore: let dependabot ignore arrow and datafusion (#1192)
iajoiner Mar 1, 2023
2fb6c6e
chore: increment dynamodb_lock version (#1202)
wjones127 Mar 1, 2023
d3a8152
docs(python): update docs (#1155)
wjones127 Mar 3, 2023
9d13ee1
fix: load command for local tables (#1205)
roeap Mar 4, 2023
f2973dc
Python write_deltalake fails if pyarrow table contains binary columns…
rbushri Mar 6, 2023
938647d
feat: extend configuration handling (#1206)
roeap Mar 6, 2023
d543798
add boolean, date, timestamp & binary partition types (#1180)
marijncv Mar 6, 2023
fe7a417
feat: typed commit info (#1207)
roeap Mar 7, 2023
1fe2b04
Add monthly PyPi downloads badge to README (#1213)
MrPowers Mar 8, 2023
0c8e00e
Implement pruning on partition columns (#1179)
Blajda Mar 10, 2023
101bc3f
build(deps): bump datafusion (#1217)
roeap Mar 11, 2023
86b0b9f
docs: update changelog for Rust 0.8.0 release (#1216)
iajoiner Mar 12, 2023
18901a3
Unique delta object store url (#1212)
gruuya Mar 13, 2023
9cd8c36
fix: make sure we handle data checking correctly (#1222)
wjones127 Mar 13, 2023
6d4f775
Merge branch 'delta-io:main' into support-arrow-datatypes
chitralverma Mar 17, 2023
a4f4e4b
Merge branch 'main' into support-arrow-datatypes
chitralverma Mar 17, 2023
86df125
Merge remote-tracking branch 'origin/support-arrow-datatypes' into su…
chitralverma Mar 17, 2023
70a9d09
add test case for python writer
chitralverma Mar 17, 2023
0b61c24
fix lint
chitralverma Mar 23, 2023
a567265
Support for LargeList
chitralverma Mar 26, 2023
d4e8b76
Cast types to large if required
chitralverma Mar 26, 2023
c0b9912
large sub type for array
chitralverma Mar 26, 2023
bf8ed64
large sub type for maps and structs
chitralverma Mar 26, 2023
fcb2fff
fix lint
chitralverma Mar 27, 2023
71d72e6
Merge remote-tracking branch 'upstream/main' into support-arrow-datat…
chitralverma Mar 27, 2023
cc19b0b
update with main
chitralverma Mar 27, 2023
efd9af3
add support for UInt arrow types
chitralverma Mar 27, 2023
211d0b2
add support for Float16 arrow type
chitralverma Mar 27, 2023
4166ed3
add support for Date64 arrow type
chitralverma Mar 28, 2023
5f35dc2
rollback float16
chitralverma Mar 28, 2023
155551b
Apply suggestions from code review
chitralverma Mar 29, 2023
f14c950
fix signature
chitralverma Mar 29, 2023
7da8209
rollback casting for FixedSizedBinary and FixedSizedList
chitralverma Mar 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class Schema:
def to_json(self) -> str: ...
@staticmethod
def from_json(json: str) -> "Schema": ...
def to_pyarrow(self) -> pa.Schema: ...
def to_pyarrow(self, as_large_types: bool = False) -> pa.Schema: ...
@staticmethod
def from_pyarrow(type: pa.Schema) -> "Schema": ...

Expand Down
4 changes: 1 addition & 3 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,7 @@ impl RawDeltaTable {
let column_names: HashSet<&str> = self
._table
.schema()
.ok_or(PyDeltaTableError::new_err(
"table does not yet have a schema",
))?
.ok_or_else(|| PyDeltaTableError::new_err("table does not yet have a schema"))?
.get_fields()
.iter()
.map(|field| field.get_name())
Expand Down
79 changes: 71 additions & 8 deletions python/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ pub fn schema_to_pyobject(schema: &Schema, py: Python) -> PyResult<PyObject> {
/// >>> import pyarrow as pa
/// >>> Schema.from_pyarrow(pa.schema({"x": pa.int32(), "y": pa.string()}))
/// Schema([Field(x, PrimitiveType("integer"), nullable=True), Field(y, PrimitiveType("string"), nullable=True)])
#[pyclass(extends=StructType, name="Schema", module="deltalake.schema",
#[pyclass(extends = StructType, name = "Schema", module = "deltalake.schema",
text_signature = "(fields)")]
pub struct PySchema;

Expand Down Expand Up @@ -1007,15 +1007,78 @@ impl PySchema {

/// Return equivalent PyArrow schema
///
/// :param as_large_types: get schema with all variable size types (list,
/// binary, string) as large variants (with int64 indices). This is for
/// compatibility with systems like Polars that only support the large
/// versions of Arrow types.
///
/// :rtype: pyarrow.Schema
#[pyo3(text_signature = "($self)")]
fn to_pyarrow(self_: PyRef<'_, Self>) -> PyResult<PyArrowType<ArrowSchema>> {
#[pyo3(signature = (as_large_types = false))]
fn to_pyarrow(
self_: PyRef<'_, Self>,
as_large_types: bool,
) -> PyResult<PyArrowType<ArrowSchema>> {
let super_ = self_.as_ref();
Ok(PyArrowType(
(&super_.inner_type.clone())
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?,
))
let res: ArrowSchema = (&super_.inner_type.clone())
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?;

fn convert_to_large_type(field: ArrowField, dt: ArrowDataType) -> ArrowField {
match dt {
ArrowDataType::Utf8 => field.with_data_type(ArrowDataType::LargeUtf8),

ArrowDataType::Binary => field.with_data_type(ArrowDataType::LargeBinary),

ArrowDataType::List(f) => {
let sub_field = convert_to_large_type(*f.clone(), f.data_type().clone());
field.with_data_type(ArrowDataType::LargeList(Box::from(sub_field)))
}

ArrowDataType::FixedSizeList(f, size) => {
let sub_field = convert_to_large_type(*f.clone(), f.data_type().clone());
field.with_data_type(ArrowDataType::FixedSizeList(Box::from(sub_field), size))
}

ArrowDataType::Map(f, sorted) => {
let sub_field = convert_to_large_type(*f.clone(), f.data_type().clone());
field.with_data_type(ArrowDataType::Map(Box::from(sub_field), sorted))
}

ArrowDataType::Struct(fields) => {
let sub_fields = fields
.iter()
.map(|f| {
let dt: ArrowDataType = f.data_type().clone();
let f: ArrowField = f.clone();

convert_to_large_type(f, dt)
})
.collect();

field.with_data_type(ArrowDataType::Struct(sub_fields))
}

_ => field,
}
}

if as_large_types {
let schema = ArrowSchema::new(
res.fields
.iter()
.map(|f| {
let dt: ArrowDataType = f.data_type().clone();
let f: ArrowField = f.clone();

convert_to_large_type(f, dt)
})
.collect(),
);

Ok(PyArrowType(schema))
} else {
Ok(PyArrowType(res))
}
}

/// Create from a PyArrow schema
Expand Down
40 changes: 40 additions & 0 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,3 +816,43 @@ def test_max_partitions_exceeding_fragment_should_fail(
max_partitions=1,
partition_by=["p1", "p2"],
)


def test_large_arrow_types(tmp_path: pathlib.Path):
pylist = [
{"name": "Joey", "gender": b"M", "arr_type": ["x", "y"], "dict": {"a": b"M"}},
{"name": "Ivan", "gender": b"F", "arr_type": ["x", "z"]},
]
schema = pa.schema(
[
pa.field("name", pa.large_string()),
pa.field("gender", pa.large_binary()),
pa.field("arr_type", pa.large_list(pa.large_string())),
pa.field("map_type", pa.map_(pa.large_string(), pa.large_binary())),
pa.field("struct", pa.struct([pa.field("sub", pa.large_string())])),
]
)
table = pa.Table.from_pylist(pylist, schema=schema)

write_deltalake(tmp_path, table)

dt = DeltaTable(tmp_path)
assert table.schema == dt.schema().to_pyarrow(as_large_types=True)


def test_uint_arrow_types(tmp_path: pathlib.Path):
pylist = [
{"num1": 3, "num2": 3, "num3": 3, "num4": 5},
{"num1": 1, "num2": 13, "num3": 35, "num4": 13},
]
schema = pa.schema(
[
pa.field("num1", pa.uint8()),
pa.field("num2", pa.uint16()),
pa.field("num3", pa.uint32()),
pa.field("num4", pa.uint64()),
]
)
table = pa.Table.from_pylist(pylist, schema=schema)

write_deltalake(tmp_path, table)
18 changes: 18 additions & 0 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,21 +208,33 @@ impl TryFrom<&ArrowDataType> for schema::SchemaDataType {
fn try_from(arrow_datatype: &ArrowDataType) -> Result<Self, ArrowError> {
match arrow_datatype {
ArrowDataType::Utf8 => Ok(schema::SchemaDataType::primitive("string".to_string())),
ArrowDataType::LargeUtf8 => Ok(schema::SchemaDataType::primitive("string".to_string())),
ArrowDataType::Int64 => Ok(schema::SchemaDataType::primitive("long".to_string())), // undocumented type
ArrowDataType::Int32 => Ok(schema::SchemaDataType::primitive("integer".to_string())),
ArrowDataType::Int16 => Ok(schema::SchemaDataType::primitive("short".to_string())),
ArrowDataType::Int8 => Ok(schema::SchemaDataType::primitive("byte".to_string())),
ArrowDataType::UInt64 => Ok(schema::SchemaDataType::primitive("long".to_string())), // undocumented type
ArrowDataType::UInt32 => Ok(schema::SchemaDataType::primitive("integer".to_string())),
ArrowDataType::UInt16 => Ok(schema::SchemaDataType::primitive("short".to_string())),
ArrowDataType::UInt8 => Ok(schema::SchemaDataType::primitive("byte".to_string())),
ArrowDataType::Float32 => Ok(schema::SchemaDataType::primitive("float".to_string())),
ArrowDataType::Float64 => Ok(schema::SchemaDataType::primitive("double".to_string())),
ArrowDataType::Boolean => Ok(schema::SchemaDataType::primitive("boolean".to_string())),
ArrowDataType::Binary => Ok(schema::SchemaDataType::primitive("binary".to_string())),
ArrowDataType::FixedSizeBinary(_) => {
Ok(schema::SchemaDataType::primitive("binary".to_string()))
}
ArrowDataType::LargeBinary => {
Ok(schema::SchemaDataType::primitive("binary".to_string()))
}
ArrowDataType::Decimal128(p, s) => Ok(schema::SchemaDataType::primitive(format!(
"decimal({p},{s})"
))),
ArrowDataType::Decimal256(p, s) => Ok(schema::SchemaDataType::primitive(format!(
"decimal({p},{s})"
))),
ArrowDataType::Date32 => Ok(schema::SchemaDataType::primitive("date".to_string())),
ArrowDataType::Date64 => Ok(schema::SchemaDataType::primitive("date".to_string())),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => {
Ok(schema::SchemaDataType::primitive("timestamp".to_string()))
}
Expand All @@ -244,6 +256,12 @@ impl TryFrom<&ArrowDataType> for schema::SchemaDataType {
(*field).is_nullable(),
)))
}
ArrowDataType::LargeList(field) => {
Ok(schema::SchemaDataType::array(schema::SchemaTypeArray::new(
Box::new((*field).data_type().try_into()?),
(*field).is_nullable(),
)))
}
ArrowDataType::FixedSizeList(field, _) => {
Ok(schema::SchemaDataType::array(schema::SchemaTypeArray::new(
Box::new((*field).data_type().try_into()?),
Expand Down
27 changes: 12 additions & 15 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ async fn get_scan_metrics(
visit_execution_plan(&plan, &mut metrics).unwrap();
}

return Ok(metrics);
Ok(metrics)
}

fn create_all_types_batch(not_null_rows: usize, null_rows: usize, offset: usize) -> RecordBatch {
Expand Down Expand Up @@ -488,30 +488,27 @@ async fn test_files_scanned() -> Result<()> {
let table = append_to_table(table, batch).await;

let metrics = get_scan_metrics(&table, &state, &[]).await?;
assert!(metrics.num_scanned_files() == 3);
assert_eq!(metrics.num_scanned_files(), 3);

// (Column name, value from file 1, value from file 2, value from file 3, non existant value)
// (Column name, value from file 1, value from file 2, value from file 3, non existent value)
let tests = [
TestCase::new("utf8", |value| lit(value.to_string())),
TestCase::new("int64", |value| lit(value)),
TestCase::new("int64", lit),
TestCase::new("int32", |value| lit(value as i32)),
TestCase::new("int16", |value| lit(value as i16)),
TestCase::new("int8", |value| lit(value as i8)),
TestCase::new("float64", |value| lit(value as f64)),
TestCase::new("float32", |value| lit(value as f32)),
TestCase::new("timestamp", |value| {
lit(ScalarValue::TimestampMicrosecond(
Some(value * 1_000_000),
None,
))
lit(TimestampMicrosecond(Some(value * 1_000_000), None))
}),
// TODO: I think decimal statistics are being written to the log incorrectly. The underlying i128 is written
// not the proper string representation as specified by the percision and scale
// not the proper string representation as specified by the precision and scale
TestCase::new("decimal", |value| {
lit(Decimal128(Some((value * 100).into()), 10, 2))
}),
// TODO: The writer does not write complete statistiics for date columns
TestCase::new("date", |value| lit(ScalarValue::Date32(Some(value as i32)))),
// TODO: The writer does not write complete statistics for date columns
TestCase::new("date", |value| lit(Date32(Some(value as i32)))),
// TODO: The writer does not write complete statistics for binary columns
TestCase::new("binary", |value| lit(value.to_string().as_bytes())),
];
Expand Down Expand Up @@ -544,7 +541,7 @@ async fn test_files_scanned() -> Result<()> {
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 0);

// Conjuction
// Conjunction
let e = col(column)
.gt(file1_value.clone())
.and(col(column).lt(file2_value.clone()));
Expand Down Expand Up @@ -617,7 +614,7 @@ async fn test_files_scanned() -> Result<()> {
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 0);

// Conjuction
// Conjunction
let e = col(column)
.gt(file1_value.clone())
.and(col(column).lt(file2_value));
Expand Down Expand Up @@ -679,12 +676,12 @@ async fn test_files_scanned() -> Result<()> {
// Check pruning for null partitions
let e = col("k").is_null();
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert!(metrics.num_scanned_files() == 1);
assert_eq!(metrics.num_scanned_files(), 1);

// Check pruning for null partitions. Since there are no record count statistics pruning cannot be done
let e = col("k").is_not_null();
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert!(metrics.num_scanned_files() == 2);
assert_eq!(metrics.num_scanned_files(), 2);

Ok(())
}
Expand Down