Skip to content

Commit

Permalink
Add timestamp support to C data interface
Browse files Browse the repository at this point in the history
Add extra date32 inttegration test
  • Loading branch information
alippai committed Jun 20, 2021
1 parent 47c3c8c commit de45abe
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ jobs:
python -m venv venv
source venv/bin/activate
pip install maturin==0.8.2 toml==0.10.1 pyarrow==1.0.0
pip install maturin==0.8.2 toml==0.10.1 pyarrow==1.0.0 pytz
maturin develop
python -m unittest discover tests
Expand Down
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT" }
pyo3 = { version = "0.12.1", features = ["extension-module"] }

[package.metadata.maturin]
requires-dist = ["pyarrow>=1"]
requires-dist = ["pyarrow>=1", "pytz"]
62 changes: 61 additions & 1 deletion arrow-pyarrow-integration-testing/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
# under the License.

import unittest
from datetime import date, datetime
from decimal import Decimal

import pyarrow
import arrow_pyarrow_integration_testing
import pyarrow
from pytz import timezone


class TestCase(unittest.TestCase):
Expand Down Expand Up @@ -80,6 +82,64 @@ def test_time32_python(self):
# No leak of C++ memory
self.assertEqual(old_allocated, pyarrow.total_allocated_bytes())

def test_date32_python(self):
"""
Python -> Rust -> Python
"""
old_allocated = pyarrow.total_allocated_bytes()
py_array = [None, date(1990, 3, 9), date(2021, 6, 20)]
a = pyarrow.array(py_array, pyarrow.date32())
b = arrow_pyarrow_integration_testing.concatenate(a)
expected = pyarrow.array(py_array + py_array, pyarrow.date32())
self.assertEqual(b, expected)
del a
del b
del expected
# No leak of C++ memory
self.assertEqual(old_allocated, pyarrow.total_allocated_bytes())

def test_timestamp_python(self):
"""
Python -> Rust -> Python
"""
old_allocated = pyarrow.total_allocated_bytes()
py_array = [
None,
datetime(2021, 1, 1, 1, 1, 1, 1),
datetime(2020, 3, 9, 1, 1, 1, 1),
]
a = pyarrow.array(py_array, pyarrow.timestamp("us"))
b = arrow_pyarrow_integration_testing.concatenate(a)
expected = pyarrow.array(py_array + py_array, pyarrow.timestamp("us"))
self.assertEqual(b, expected)
del a
del b
del expected
# No leak of C++ memory
self.assertEqual(old_allocated, pyarrow.total_allocated_bytes())

def test_timestamp_tz_python(self):
"""
Python -> Rust -> Python
"""
old_allocated = pyarrow.total_allocated_bytes()
py_array = [
None,
datetime(2021, 1, 1, 1, 1, 1, 1, tzinfo=timezone("America/New_York")),
datetime(2020, 3, 9, 1, 1, 1, 1, tzinfo=timezone("America/New_York")),
]
a = pyarrow.array(py_array, pyarrow.timestamp("us", tz="America/New_York"))
b = arrow_pyarrow_integration_testing.concatenate(a)
expected = pyarrow.array(
py_array + py_array, pyarrow.timestamp("us", tz="America/New_York")
)
self.assertEqual(b, expected)
del a
del b
del expected
# No leak of C++ memory
self.assertEqual(old_allocated, pyarrow.total_allocated_bytes())

def test_decimal_python(self):
"""
Python -> Rust -> Python
Expand Down
71 changes: 70 additions & 1 deletion arrow/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,20 @@ fn to_field(schema: &FFI_ArrowSchema) -> Result<Field> {
})?;
DataType::Decimal(parsed_precision, parsed_scale)
}
["tss", ""] => DataType::Timestamp(TimeUnit::Second, None),
["tsm", ""] => DataType::Timestamp(TimeUnit::Millisecond, None),
["tsu", ""] => DataType::Timestamp(TimeUnit::Microsecond, None),
["tsn", ""] => DataType::Timestamp(TimeUnit::Nanosecond, None),
["tss", tz] => DataType::Timestamp(TimeUnit::Second, Some(tz.to_string())),
["tsm", tz] => {
DataType::Timestamp(TimeUnit::Millisecond, Some(tz.to_string()))
}
["tsu", tz] => {
DataType::Timestamp(TimeUnit::Microsecond, Some(tz.to_string()))
}
["tsn", tz] => {
DataType::Timestamp(TimeUnit::Nanosecond, Some(tz.to_string()))
}
_ => {
return Err(ArrowError::CDataInterface(format!(
"The datatype \"{:?}\" is still not supported in Rust implementation",
Expand Down Expand Up @@ -345,6 +359,22 @@ fn to_format(data_type: &DataType) -> Result<String> {
DataType::Time32(TimeUnit::Millisecond) => "ttm",
DataType::Time64(TimeUnit::Microsecond) => "ttu",
DataType::Time64(TimeUnit::Nanosecond) => "ttn",
DataType::Timestamp(TimeUnit::Second, None) => "tss:",
DataType::Timestamp(TimeUnit::Millisecond, None) => "tsm:",
DataType::Timestamp(TimeUnit::Microsecond, None) => "tsu:",
DataType::Timestamp(TimeUnit::Nanosecond, None) => "tsn:",
DataType::Timestamp(TimeUnit::Second, Some(tz)) => {
return Ok(format!("tss:{}", tz))
}
DataType::Timestamp(TimeUnit::Millisecond, Some(tz)) => {
return Ok(format!("tsm:{}", tz))
}
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
return Ok(format!("tsu:{}", tz))
}
DataType::Timestamp(TimeUnit::Nanosecond, Some(tz)) => {
return Ok(format!("tsn:{}", tz))
}
DataType::List(_) => "+l",
DataType::LargeList(_) => "+L",
DataType::Struct(_) => "+s",
Expand Down Expand Up @@ -377,6 +407,7 @@ fn bit_width(data_type: &DataType, i: usize) -> Result<usize> {
(DataType::Float32, 1) => size_of::<f32>() * 8,
(DataType::Float64, 1) => size_of::<f64>() * 8,
(DataType::Decimal(..), 1) => size_of::<i128>() * 8,
(DataType::Timestamp(..), 1) => size_of::<i64>() * 8,
// primitive types have a single buffer
(DataType::Boolean, _) |
(DataType::UInt8, _) |
Expand All @@ -389,7 +420,8 @@ fn bit_width(data_type: &DataType, i: usize) -> Result<usize> {
(DataType::Int64, _) | (DataType::Date64, _) | (DataType::Time64(_), _) |
(DataType::Float32, _) |
(DataType::Float64, _) |
(DataType::Decimal(..), _) => {
(DataType::Decimal(..), _) |
(DataType::Timestamp(..), _) => {
return Err(ArrowError::CDataInterface(format!(
"The datatype \"{:?}\" expects 2 buffers, but requested {}. Please verify that the C data interface is correctly implemented.",
data_type, i
Expand Down Expand Up @@ -872,6 +904,7 @@ mod tests {
make_array, Array, ArrayData, BinaryOffsetSizeTrait, BooleanArray, DecimalArray,
DecimalBuilder, GenericBinaryArray, GenericListArray, GenericStringArray,
Int32Array, OffsetSizeTrait, StringOffsetSizeTrait, Time32MillisecondArray,
TimestampMillisecondArray,
};
use crate::compute::kernels;
use crate::datatypes::Field;
Expand Down Expand Up @@ -1143,4 +1176,40 @@ mod tests {
// (drop/release)
Ok(())
}

#[test]
fn test_timestamp() -> Result<()> {
// create an array natively
let array = TimestampMillisecondArray::from(vec![None, Some(1), Some(2)]);

// export it
let array = ArrowArray::try_from(array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let array = make_array(data);

// perform some operation
let array = kernels::concat::concat(&[array.as_ref(), array.as_ref()]).unwrap();
let array = array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();

// verify
assert_eq!(
array,
&TimestampMillisecondArray::from(vec![
None,
Some(1),
Some(2),
None,
Some(1),
Some(2)
])
);

// (drop/release)
Ok(())
}
}

0 comments on commit de45abe

Please sign in to comment.