diff --git a/.github/workflows/rust_cron.yml b/.github/workflows/rust_cron.yml index fed6a996540ba..11af0aba36f7f 100644 --- a/.github/workflows/rust_cron.yml +++ b/.github/workflows/rust_cron.yml @@ -54,3 +54,26 @@ jobs: continue-on-error: true shell: bash run: bash <(curl -s https://codecov.io/bash) + + pyarrow-integration: + name: AMD64 Debian 10 Rust ${{ matrix.rust }} Pyarrow integration + runs-on: ubuntu-latest + if: ${{ !contains(github.event.pull_request.title, 'WIP') && github.repository == 'apache/arrow' }} + strategy: + fail-fast: false + matrix: + rust: [nightly-2020-11-19] + env: + RUST: ${{ matrix.rust }} + steps: + - name: Checkout Arrow + uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Fetch Submodules and Tags + run: ci/scripts/util_checkout.sh + - name: Run test + shell: bash + run: | + echo ${RUST} > rust/rust-toolchain && + ci/scripts/rust_pyarrow_integration.sh `pwd` `pwd`/build $RUST diff --git a/ci/docker/debian-10-rust.dockerfile b/ci/docker/debian-10-rust.dockerfile index 39df235ac89b0..99507cf796ac2 100644 --- a/ci/docker/debian-10-rust.dockerfile +++ b/ci/docker/debian-10-rust.dockerfile @@ -53,7 +53,8 @@ ENV CARGO_HOME="/rust/cargo" \ # compiled dependencies. Create the directories and place an empty lib.rs # files. COPY rust /arrow/rust -RUN mkdir \ +RUN mkdir -p \ + /arrow/rust/arrow-pyarrow-integration-testing/src \ /arrow/rust/arrow-flight/src \ /arrow/rust/arrow/src \ /arrow/rust/benchmarks/src \ @@ -63,6 +64,7 @@ RUN mkdir \ /arrow/rust/parquet_derive/src \ /arrow/rust/parquet_derive_test/src && \ touch \ + /arrow/rust/arrow-pyarrow-integration-testing/src/lib.rs \ /arrow/rust/arrow-flight/src/lib.rs \ /arrow/rust/arrow/src/lib.rs \ /arrow/rust/benchmarks/src/lib.rs \ diff --git a/ci/scripts/rust_pyarrow_integration.sh b/ci/scripts/rust_pyarrow_integration.sh new file mode 100755 index 0000000000000..e4208114144f0 --- /dev/null +++ b/ci/scripts/rust_pyarrow_integration.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -ex + +arrow_dir=${1} +source_dir=${1}/rust +build_dir=${2}/rust +rust=${3} + +export ARROW_TEST_DATA=${arrow_dir}/testing/data +export PARQUET_TEST_DATA=${arrow_dir}/cpp/submodules/parquet-testing/data +export CARGO_TARGET_DIR=${build_dir} + +pushd ${source_dir}/arrow-pyarrow-integration-testing + +#rustup default ${rust} +#rustup component add rustfmt --toolchain ${rust}-x86_64-unknown-linux-gnu +python3 -m venv venv +venv/bin/pip install maturin==0.8.2 toml==0.10.1 pyarrow==1.0.0 + +source venv/bin/activate +maturin develop +python -m unittest discover tests + +popd diff --git a/dev/release/00-prepare-test.rb b/dev/release/00-prepare-test.rb index b601f14141045..7ae4473b3d28d 100644 --- a/dev/release/00-prepare-test.rb +++ b/dev/release/00-prepare-test.rb @@ -271,6 +271,15 @@ def test_version_pre_tag "+arrow = { path = \"../arrow\", version = \"#{@release_version}\" }"], ], }, + { + path: "rust/arrow-pyarrow-integration-testing/Cargo.toml", + hunks: [ + ["-version = \"#{@snapshot_version}\"", + "+version = \"#{@release_version}\""], + ["-arrow = { path = \"../arrow\", version = \"#{@snapshot_version}\" }", + "+arrow = { path = \"../arrow\", version = \"#{@release_version}\" }"], + ], + }, { path: "rust/arrow/Cargo.toml", hunks: [ @@ -509,6 +518,15 @@ def test_version_post_tag "+arrow = { path = \"../arrow\", version = \"#{@next_snapshot_version}\" }"], ], }, + { + path: "rust/arrow-pyarrow-integration-testing/Cargo.toml", + hunks: [ + ["-version = \"#{@release_version}\"", + "+version = \"#{@next_snapshot_version}\""], + ["-arrow = { path = \"../arrow\", version = \"#{@release_version}\" }", + "+arrow = { path = \"../arrow\", version = \"#{@next_snapshot_version}\" }"], + ], + }, { path: "rust/arrow/Cargo.toml", hunks: [ diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 459fe8fd4edd1..16e34de7f1472 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -26,3 +26,8 @@ members = [ "integration-testing", "benchmarks", ] + +# this package is excluded because it requires different compilation flags, thereby significantly changing +# how it is compiled within the workspace, causing the whole workspace to be compiled from scratch +# this way, this is a stand-alone package that compiles independently of the others. +exclude = ["arrow-pyarrow-integration-testing"] diff --git a/rust/arrow-pyarrow-integration-testing/.cargo/config b/rust/arrow-pyarrow-integration-testing/.cargo/config new file mode 100644 index 0000000000000..a127967f66c5a --- /dev/null +++ b/rust/arrow-pyarrow-integration-testing/.cargo/config @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[target.x86_64-apple-darwin] +rustflags = [ + "-C", "link-arg=-undefined", + "-C", "link-arg=dynamic_lookup", +] \ No newline at end of file diff --git a/rust/arrow-pyarrow-integration-testing/.gitignore b/rust/arrow-pyarrow-integration-testing/.gitignore new file mode 100644 index 0000000000000..82adb58b4d648 --- /dev/null +++ b/rust/arrow-pyarrow-integration-testing/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +venv diff --git a/rust/arrow-pyarrow-integration-testing/Cargo.toml b/rust/arrow-pyarrow-integration-testing/Cargo.toml new file mode 100644 index 0000000000000..7a99855e7a400 --- /dev/null +++ b/rust/arrow-pyarrow-integration-testing/Cargo.toml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "arrow-pyarrow-integration-testing" +description = "" +version = "3.0.0-SNAPSHOT" +homepage = "https://github.com/apache/arrow" +repository = "https://github.com/apache/arrow" +authors = ["Apache Arrow "] +license = "Apache-2.0" +keywords = [ "arrow" ] +edition = "2018" + +[lib] +name = "arrow_pyarrow_integration_testing" +crate-type = ["cdylib"] + +[dependencies] +arrow = { path = "../arrow", version = "3.0.0-SNAPSHOT" } +pyo3 = { version = "0.12.1", features = ["extension-module"] } + +[package.metadata.maturin] +requires-dist = ["pyarrow>=1"] diff --git a/rust/arrow-pyarrow-integration-testing/README.md b/rust/arrow-pyarrow-integration-testing/README.md new file mode 100644 index 0000000000000..7e78aa9ec70e0 --- /dev/null +++ b/rust/arrow-pyarrow-integration-testing/README.md @@ -0,0 +1,57 @@ + + +# Arrow c integration + +This is a Rust crate that tests compatibility between Rust's Arrow implementation and PyArrow. + +Note that this crate uses two languages and an external ABI: +* `Rust` +* `Python` +* C ABI privately exposed by `Pyarrow`. + +## Basic idea + +Pyarrow exposes a C ABI to convert arrow arrays from and to its C implementation, see [here](https://arrow.apache.org/docs/format/CDataInterface.html). + +This package uses the equivalent struct in Rust (`arrow::array::ArrowArray`), and verifies that +we can use pyarrow's interface to move pointers from and to Rust. + +## Relevant literature + +* [Arrow's CDataInterface](https://arrow.apache.org/docs/format/CDataInterface.html) +* [Rust's FFI](https://doc.rust-lang.org/nomicon/ffi.html) +* [Pyarrow private binds](https://github.com/apache/arrow/blob/ae1d24efcc3f1ac2a876d8d9f544a34eb04ae874/python/pyarrow/array.pxi#L1226) +* [PyO3](https://docs.rs/pyo3/0.12.1/pyo3/index.html) + +## How to develop + +```bash +# prepare development environment (used to build wheel / install in development) +python -m venv venv +venv/bin/pip install maturin==0.8.2 toml==0.10.1 pyarrow==1.0.0 +``` + +Whenever rust code changes (your changes or via git pull): + +```bash +source venv/bin/activate +maturin develop +python -m unittest discover tests +``` diff --git a/rust/arrow-pyarrow-integration-testing/pyproject.toml b/rust/arrow-pyarrow-integration-testing/pyproject.toml new file mode 100644 index 0000000000000..27480690e06cc --- /dev/null +++ b/rust/arrow-pyarrow-integration-testing/pyproject.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[build-system] +requires = ["maturin"] +build-backend = "maturin" diff --git a/rust/arrow-pyarrow-integration-testing/src/lib.rs b/rust/arrow-pyarrow-integration-testing/src/lib.rs new file mode 100644 index 0000000000000..73f97a3a713d2 --- /dev/null +++ b/rust/arrow-pyarrow-integration-testing/src/lib.rs @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This library demonstrates a minimal usage of Rust's C data interface to pass +//! arrays from and to Python. + +use std::error; +use std::fmt; +use std::sync::Arc; + +use pyo3::exceptions::PyOSError; +use pyo3::wrap_pyfunction; +use pyo3::{libc::uintptr_t, prelude::*}; + +use arrow::array::{make_array_from_raw, ArrayRef, Int64Array}; +use arrow::compute::kernels; +use arrow::error::ArrowError; +use arrow::ffi; + +/// an error that bridges ArrowError with a Python error +#[derive(Debug)] +enum PyO3ArrowError { + ArrowError(ArrowError), +} + +impl fmt::Display for PyO3ArrowError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + PyO3ArrowError::ArrowError(ref e) => e.fmt(f), + } + } +} + +impl error::Error for PyO3ArrowError { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match *self { + // The cause is the underlying implementation error type. Is implicitly + // cast to the trait object `&error::Error`. This works because the + // underlying type already implements the `Error` trait. + PyO3ArrowError::ArrowError(ref e) => Some(e), + } + } +} + +impl From for PyO3ArrowError { + fn from(err: ArrowError) -> PyO3ArrowError { + PyO3ArrowError::ArrowError(err) + } +} + +impl From for PyErr { + fn from(err: PyO3ArrowError) -> PyErr { + PyOSError::new_err(err.to_string()) + } +} + +fn to_rust(ob: PyObject, py: Python) -> PyResult { + // prepare a pointer to receive the Array struct + let (array_pointer, schema_pointer) = + ffi::ArrowArray::into_raw(unsafe { ffi::ArrowArray::empty() }); + + // make the conversion through PyArrow's private API + // this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds + ob.call_method1( + py, + "_export_to_c", + (array_pointer as uintptr_t, schema_pointer as uintptr_t), + )?; + + let array = unsafe { make_array_from_raw(array_pointer, schema_pointer) } + .map_err(|e| PyO3ArrowError::from(e))?; + Ok(array) +} + +fn to_py(array: ArrayRef, py: Python) -> PyResult { + let (array_pointer, schema_pointer) = + array.to_raw().map_err(|e| PyO3ArrowError::from(e))?; + + let pa = py.import("pyarrow")?; + + let array = pa.getattr("Array")?.call_method1( + "_import_from_c", + (array_pointer as uintptr_t, schema_pointer as uintptr_t), + )?; + Ok(array.to_object(py)) +} + +/// Returns `array + array` of an int64 array. +#[pyfunction] +fn double(array: PyObject, py: Python) -> PyResult { + // import + let array = to_rust(array, py)?; + + // perform some operation + let array = + array + .as_any() + .downcast_ref::() + .ok_or(PyO3ArrowError::ArrowError(ArrowError::ParseError( + "Expects an int64".to_string(), + )))?; + let array = + kernels::arithmetic::add(&array, &array).map_err(|e| PyO3ArrowError::from(e))?; + let array = Arc::new(array); + + // export + to_py(array, py) +} + +/// calls a lambda function that receives and returns an array +/// whose result must be the array multiplied by two +#[pyfunction] +fn double_py(lambda: PyObject, py: Python) -> PyResult { + // create + let array = Arc::new(Int64Array::from(vec![Some(1), None, Some(3)])); + let expected = Arc::new(Int64Array::from(vec![Some(2), None, Some(6)])) as ArrayRef; + + // to py + let array = to_py(array, py)?; + + let array = lambda.call1(py, (array,))?; + + let array = to_rust(array, py)?; + + Ok(array == expected) +} + +/// Returns the substring +#[pyfunction] +fn substring(array: PyObject, start: i64, py: Python) -> PyResult { + // import + let array = to_rust(array, py)?; + + // substring + let array = kernels::substring::substring(array.as_ref(), start, &None) + .map_err(|e| PyO3ArrowError::from(e))?; + + // export + to_py(array, py) +} + +#[pymodule] +fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_wrapped(wrap_pyfunction!(double))?; + m.add_wrapped(wrap_pyfunction!(double_py))?; + m.add_wrapped(wrap_pyfunction!(substring))?; + Ok(()) +} diff --git a/rust/arrow-pyarrow-integration-testing/tests/test_sql.py b/rust/arrow-pyarrow-integration-testing/tests/test_sql.py new file mode 100644 index 0000000000000..bd332fa865f45 --- /dev/null +++ b/rust/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +import pyarrow +import arrow_pyarrow_integration_testing + + +class TestCase(unittest.TestCase): + def test_primitive_python(self): + """ + Python -> Rust -> Python + """ + old_allocated = pyarrow.total_allocated_bytes() + a = pyarrow.array([1, 2, 3]) + b = arrow_pyarrow_integration_testing.double(a) + self.assertEqual(b, pyarrow.array([2, 4, 6])) + del a + del b + # No leak of C++ memory + self.assertEqual(old_allocated, pyarrow.total_allocated_bytes()) + + def test_primitive_rust(self): + """ + Rust -> Python -> Rust + """ + old_allocated = pyarrow.total_allocated_bytes() + + def double(array): + array = array.to_pylist() + return pyarrow.array([x * 2 if x is not None else None for x in array]) + + is_correct = arrow_pyarrow_integration_testing.double_py(double) + self.assertTrue(is_correct) + # No leak of C++ memory + self.assertEqual(old_allocated, pyarrow.total_allocated_bytes()) + + def test_string_python(self): + """ + Python -> Rust -> Python + """ + old_allocated = pyarrow.total_allocated_bytes() + a = pyarrow.array(["a", None, "ccc"]) + b = arrow_pyarrow_integration_testing.substring(a, 1) + self.assertEqual(b, pyarrow.array(["", None, "cc"])) + del a + del b + # No leak of C++ memory + self.assertEqual(old_allocated, pyarrow.total_allocated_bytes()) diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 0e35fcc746a2d..0dc43e075e0c8 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -57,6 +57,10 @@ default = [] avx512 = [] simd = ["packed_simd"] prettyprint = ["prettytable-rs"] +# this is only intended to be used in single-threaded programs: it verifies that +# all allocated memory is being released (no memory leaks). +# See README for details +memory-check = [] [dev-dependencies] criterion = "0.3" diff --git a/rust/arrow/README.md b/rust/arrow/README.md index dc99f5f23e135..cbeeebab48667 100644 --- a/rust/arrow/README.md +++ b/rust/arrow/README.md @@ -40,6 +40,19 @@ cargo test runs all the tests. +### How to check memory allocations + +This crate heavily uses `unsafe` due to how memory is allocated in cache lines. +We have a small tool to verify that this crate does not leak memory (beyond what the compiler already does) + +Run it with + +```bash +cargo test --features memory-check --lib -- --test-threads 1 +``` + +This runs all unit-tests on a single thread and counts all allocations and de-allocations. + ## Examples The examples folder shows how to construct some different types of Arrow diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs index 9c1207833ecbb..dcf8284fd0b64 100644 --- a/rust/arrow/src/array/array.rs +++ b/rust/arrow/src/array/array.rs @@ -15,13 +15,15 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::fmt; use std::sync::Arc; +use std::{any::Any, convert::TryFrom}; use super::ArrayDataRef; use super::*; use crate::array::equal_json::JsonEqual; +use crate::error::Result; +use crate::ffi; /// Trait for dealing with different types of array at runtime when the type of the /// array is not known in advance. @@ -198,6 +200,15 @@ pub trait Array: fmt::Debug + Send + Sync + JsonEqual { /// Returns the total number of bytes of memory occupied physically by this array. fn get_array_memory_size(&self) -> usize; + + /// returns two pointers that represent this array in the C Data Interface (FFI) + fn to_raw( + &self, + ) -> Result<(*const ffi::FFI_ArrowArray, *const ffi::FFI_ArrowSchema)> { + let data = self.data().as_ref().clone(); + let array = ffi::ArrowArray::try_from(data)?; + Ok(ffi::ArrowArray::into_raw(array)) + } } /// A reference-counted reference to a generic `Array`. @@ -312,6 +323,18 @@ pub fn make_array(data: ArrayDataRef) -> ArrayRef { } } +/// Creates a new array from two FFI pointers. Used to import arrays from the C Data Interface +/// # Safety +/// Assumes that these pointers represent valid C Data Interfaces, both in memory +/// representation and lifetime via the `release` mechanism. +pub unsafe fn make_array_from_raw( + array: *const ffi::FFI_ArrowArray, + schema: *const ffi::FFI_ArrowSchema, +) -> Result { + let array = ffi::ArrowArray::try_from_raw(array, schema)?; + let data = Arc::new(ArrayData::try_from(array)?); + Ok(make_array(data)) +} // Helper function for printing potentially long arrays. pub(super) fn print_long_array( array: &A, diff --git a/rust/arrow/src/array/ffi.rs b/rust/arrow/src/array/ffi.rs new file mode 100644 index 0000000000000..6fe44306f8346 --- /dev/null +++ b/rust/arrow/src/array/ffi.rs @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains functionality to load an ArrayData from the C Data Interface + +use std::convert::TryFrom; + +use crate::{ + error::{ArrowError, Result}, + ffi, +}; + +use super::ArrayData; + +impl TryFrom for ArrayData { + type Error = ArrowError; + + fn try_from(value: ffi::ArrowArray) -> Result { + let data_type = value.data_type()?; + let len = value.len(); + let offset = value.offset(); + let null_count = value.null_count(); + let buffers = value.buffers()?; + let null_bit_buffer = value.null_bit_buffer(); + + Ok(ArrayData::new( + data_type, + len, + Some(null_count), + null_bit_buffer, + offset, + buffers, + // this is empty because ffi still does not support it. + // this is ok because FFI only supports datatypes without childs + vec![], + )) + } +} + +impl TryFrom for ffi::ArrowArray { + type Error = ArrowError; + + fn try_from(value: ArrayData) -> Result { + let len = value.len(); + let offset = value.offset() as usize; + let null_count = value.null_count(); + let buffers = value.buffers().to_vec(); + let null_buffer = value.null_buffer().cloned(); + + unsafe { + ffi::ArrowArray::try_new( + value.data_type(), + len, + null_count, + null_buffer, + offset, + buffers, + // this is empty because ffi still does not support it. + // this is ok because FFI only supports datatypes without childs + vec![], + ) + } + } +} + +#[cfg(test)] +mod tests { + use crate::error::Result; + use crate::{ + array::{Array, ArrayData, Int64Array, UInt32Array, UInt64Array}, + ffi::ArrowArray, + }; + use std::convert::TryFrom; + + fn test_round_trip(expected: &ArrayData) -> Result<()> { + // create a `ArrowArray` from the data. + let d1 = ArrowArray::try_from(expected.clone())?; + + // here we export the array as 2 pointers. We would have no control over ownership if it was not for + // the release mechanism. + let (array, schema) = ArrowArray::into_raw(d1); + + // simulate an external consumer by being the consumer + let d1 = unsafe { ArrowArray::try_from_raw(array, schema) }?; + + let result = &ArrayData::try_from(d1)?; + + assert_eq!(result, expected); + Ok(()) + } + + #[test] + fn test_u32() -> Result<()> { + let data = UInt32Array::from(vec![Some(2), None, Some(1), None]).data(); + test_round_trip(data.as_ref()) + } + + #[test] + fn test_u64() -> Result<()> { + let data = UInt64Array::from(vec![Some(2), None, Some(1), None]).data(); + test_round_trip(data.as_ref()) + } + + #[test] + fn test_i64() -> Result<()> { + let data = Int64Array::from(vec![Some(2), None, Some(1), None]).data(); + test_round_trip(data.as_ref()) + } +} diff --git a/rust/arrow/src/array/mod.rs b/rust/arrow/src/array/mod.rs index 99e7cb75efb46..fb0b3029acb82 100644 --- a/rust/arrow/src/array/mod.rs +++ b/rust/arrow/src/array/mod.rs @@ -95,6 +95,7 @@ mod cast; mod data; mod equal; mod equal_json; +mod ffi; mod iterator; mod null; mod ord; @@ -272,3 +273,7 @@ pub use self::cast::{ as_boolean_array, as_dictionary_array, as_null_array, as_primitive_array, as_string_array, }; + +// ------------------------------ C Data Interface --------------------------- + +pub use self::array::make_array_from_raw; diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 16a38826afea0..bac09e28a5d9c 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -21,12 +21,16 @@ #[cfg(feature = "simd")] use packed_simd::u8x64; +use crate::{ + bytes::{Bytes, Deallocation}, + ffi, +}; + use std::cmp; use std::convert::AsRef; -use std::fmt::{Debug, Formatter}; +use std::fmt::Debug; use std::mem; use std::ops::{BitAnd, BitOr, Not}; -use std::ptr::NonNull; use std::slice::{from_raw_parts, from_raw_parts_mut}; use std::sync::Arc; @@ -46,87 +50,12 @@ use std::borrow::BorrowMut; #[derive(Clone, PartialEq, Debug)] pub struct Buffer { /// Reference-counted pointer to the internal byte buffer. - data: Arc, + data: Arc, /// The offset into the buffer. offset: usize, } -struct BufferData { - /// The raw pointer into the buffer bytes - ptr: *const u8, - - /// The length (num of bytes) of the buffer. The region `[0, len)` of the buffer - /// is occupied with meaningful data, while the rest `[len, capacity)` is the - /// unoccupied region. - len: usize, - - /// Whether this piece of memory is owned by this object - owned: bool, - - /// The capacity (num of bytes) of the buffer - /// Invariant: len <= capacity - capacity: usize, -} - -impl PartialEq for BufferData { - fn eq(&self, other: &BufferData) -> bool { - if self.len != other.len { - return false; - } - - self.data() == other.data() - } -} - -/// Release the underlying memory when the current buffer goes out of scope -impl Drop for BufferData { - fn drop(&mut self) { - if self.is_allocated() && self.owned { - unsafe { memory::free_aligned(self.ptr as *mut u8, self.capacity) }; - } - } -} - -impl Debug for BufferData { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!( - f, - "BufferData {{ ptr: {:?}, len: {}, capacity: {}, data: ", - self.ptr, self.len, self.capacity - )?; - - f.debug_list().entries(self.data().iter()).finish()?; - - write!(f, " }}") - } -} - -impl BufferData { - fn data(&self) -> &[u8] { - if !self.is_allocated() { - &[] - } else { - unsafe { std::slice::from_raw_parts(self.ptr, self.len) } - } - } - - fn is_zst(&self) -> bool { - self.ptr == BUFFER_INIT.as_ptr() as _ - } - - fn is_allocated(&self) -> bool { - !(self.is_zst() || self.ptr.is_null()) - } -} - -/// -/// SAFETY: (vcq): -/// As you can see this is global and lives as long as the program lives. -/// This is used for lazy allocation in the further steps of buffer allocations. -/// Pointer below is well-aligned, only used for ZSTs and discarded afterwards. -const BUFFER_INIT: NonNull = NonNull::dangling(); - impl Buffer { /// Creates a buffer from an existing memory region (must already be byte-aligned), this /// `Buffer` will free this piece of memory when dropped. @@ -142,7 +71,7 @@ impl Buffer { /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. pub unsafe fn from_raw_parts(ptr: *const u8, len: usize, capacity: usize) -> Self { - Buffer::build_with_arguments(ptr, len, capacity, true) + Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity)) } /// Creates a buffer from an existing memory region (must already be byte-aligned), this @@ -152,70 +81,52 @@ impl Buffer { /// /// * `ptr` - Pointer to raw parts /// * `len` - Length of raw parts in **bytes** - /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** + /// * `data` - An [ffi::FFI_ArrowArray] with the data /// /// # Safety /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` - /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. - pub unsafe fn from_unowned(ptr: *const u8, len: usize, capacity: usize) -> Self { - Buffer::build_with_arguments(ptr, len, capacity, false) + /// bytes and that the foreign deallocator frees the region. + pub unsafe fn from_unowned( + ptr: *const u8, + len: usize, + data: Arc, + ) -> Self { + Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(data)) } - /// Creates a buffer from an existing memory region (must already be byte-aligned). - /// - /// # Arguments - /// - /// * `ptr` - Pointer to raw parts - /// * `len` - Length of raw parts in bytes - /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** - /// * `owned` - Whether the raw parts is owned by this `Buffer`. If true, this `Buffer` will - /// free this memory when dropped, otherwise it will skip freeing the raw parts. - /// - /// # Safety - /// - /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` - /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. + /// Auxiliary method to create a new Buffer unsafe fn build_with_arguments( ptr: *const u8, len: usize, - capacity: usize, - owned: bool, + deallocation: Deallocation, ) -> Self { - assert!( - memory::is_aligned(ptr, memory::ALIGNMENT), - "memory not aligned" - ); - let buf_data = BufferData { - ptr, - len, - capacity, - owned, - }; + let bytes = Bytes::new(ptr, len, deallocation); Buffer { - data: Arc::new(buf_data), + data: Arc::new(bytes), offset: 0, } } /// Returns the number of bytes in the buffer pub fn len(&self) -> usize { - self.data.len - self.offset + self.data.len() - self.offset } - /// Returns the capacity of this buffer + /// Returns the capacity of this buffer. + /// For exernally owned buffers, this returns zero pub fn capacity(&self) -> usize { - self.data.capacity + self.data.capacity() } /// Returns whether the buffer is empty. pub fn is_empty(&self) -> bool { - self.data.len - self.offset == 0 + self.data.len() - self.offset == 0 } /// Returns the byte slice stored in this buffer pub fn data(&self) -> &[u8] { - &self.data.data()[self.offset..] + &self.data.as_slice()[self.offset..] } /// Returns a slice of this buffer, starting from `offset`. @@ -235,7 +146,7 @@ impl Buffer { /// Note that this should be used cautiously, and the returned pointer should not be /// stored anywhere, to avoid dangling pointers. pub fn raw_data(&self) -> *const u8 { - unsafe { self.data.ptr.add(self.offset) } + unsafe { self.data.raw_data().add(self.offset) } } /// View buffer as typed slice. @@ -291,11 +202,6 @@ impl Buffer { count } - - /// Returns an empty buffer. - pub fn empty() -> Self { - unsafe { Self::from_raw_parts(BUFFER_INIT.as_ptr() as _, 0, 0) } - } } /// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly @@ -309,7 +215,7 @@ impl> From for Buffer { let buffer = memory::allocate_aligned(capacity); unsafe { memory::memcpy(buffer, slice.as_ptr(), len); - Buffer::build_with_arguments(buffer, len, capacity, true) + Buffer::build_with_arguments(buffer, len, Deallocation::Native(capacity)) } } } @@ -907,11 +813,8 @@ impl MutableBuffer { /// Freezes this buffer and return an immutable version of it. pub fn freeze(self) -> Buffer { - let buffer_data = BufferData { - ptr: self.data, - len: self.len, - capacity: self.capacity, - owned: true, + let buffer_data = unsafe { + Bytes::new(self.data, self.len, Deallocation::Native(self.capacity)) }; std::mem::forget(self); Buffer { diff --git a/rust/arrow/src/bytes.rs b/rust/arrow/src/bytes.rs new file mode 100644 index 0000000000000..0363d8735a527 --- /dev/null +++ b/rust/arrow/src/bytes.rs @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains an implementation of a contiguous immutable memory region that knows +//! how to de-allocate itself, [`Bytes`]. +//! Note that this is a low-level functionality of this crate. + +use core::slice; +use std::sync::Arc; +use std::{fmt::Debug, fmt::Formatter}; + +use crate::{ffi, memory}; + +/// Mode of deallocating memory regions +pub enum Deallocation { + /// Native deallocation, using Rust deallocator with Arrow-specific memory aligment + Native(usize), + /// Foreign interface, via a callback + Foreign(Arc), +} + +impl Debug for Deallocation { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + Deallocation::Native(capacity) => { + write!(f, "Deallocation::Native {{ capacity: {} }}", capacity) + } + Deallocation::Foreign(_) => { + write!(f, "Deallocation::Foreign {{ capacity: unknown }}") + } + } + } +} + +/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself. +/// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's +/// global allocator nor u8 aligmnent. +/// +/// In the most common case, this buffer is allocated using [`allocate_aligned`](memory::allocate_aligned) +/// and deallocated accordingly [`free_aligned`](memory::free_aligned). +/// When the region is allocated by an foreign allocator, [Deallocation::Foreign], this calls the +/// foreign deallocator to deallocate the region when it is no longer needed. +pub struct Bytes { + /// The raw pointer to be begining of the region + ptr: *const u8, + + /// The number of bytes visible to this region. This is always smaller than its capacity (when avaliable). + len: usize, + + /// how to deallocate this region + deallocation: Deallocation, +} + +impl Bytes { + /// Takes ownership of an allocated memory region, + /// + /// # Arguments + /// + /// * `ptr` - Pointer to raw parts + /// * `len` - Length of raw parts in **bytes** + /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** + /// + /// # Safety + /// + /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` + /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. + pub unsafe fn new(ptr: *const u8, len: usize, deallocation: Deallocation) -> Bytes { + Bytes { + ptr, + len, + deallocation, + } + } + + #[inline] + pub fn as_slice(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.ptr, self.len) } + } + + #[inline] + pub fn len(&self) -> usize { + self.len + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + #[inline] + pub fn raw_data(&self) -> *const u8 { + self.ptr + } + + #[inline] + pub fn raw_data_mut(&mut self) -> *mut u8 { + self.ptr as *mut u8 + } + + pub fn capacity(&self) -> usize { + match self.deallocation { + Deallocation::Native(capacity) => capacity, + // we cannot determine this in general, + // and thus we state that this is externally-owned memory + Deallocation::Foreign(_) => 0, + } + } +} + +impl Drop for Bytes { + #[inline] + fn drop(&mut self) { + match &self.deallocation { + Deallocation::Native(capacity) => { + if !self.ptr.is_null() { + unsafe { memory::free_aligned(self.ptr as *mut u8, *capacity) }; + } + } + // foreign interface knows how to deallocate itself. + Deallocation::Foreign(_) => (), + } + } +} + +impl PartialEq for Bytes { + fn eq(&self, other: &Bytes) -> bool { + self.as_slice() == other.as_slice() + } +} + +impl Debug for Bytes { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "Bytes {{ ptr: {:?}, len: {}, data: ", self.ptr, self.len,)?; + + f.debug_list().entries(self.as_slice().iter()).finish()?; + + write!(f, " }}") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dealloc_native() { + let capacity = 5; + let a = memory::allocate_aligned(capacity); + // create Bytes and release it. This will make `a` be an invalid pointer, but it is defined behavior + unsafe { Bytes::new(a, 3, Deallocation::Native(capacity)) }; + } +} diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs index 51b614ae44c4b..76c679d8278eb 100644 --- a/rust/arrow/src/error.rs +++ b/rust/arrow/src/error.rs @@ -35,6 +35,8 @@ pub enum ArrowError { IoError(String), InvalidArgumentError(String), ParquetError(String), + /// Error during import or export to/from the C Data Interface + CDataInterface(String), DictionaryKeyOverflowError, } @@ -103,6 +105,9 @@ impl Display for ArrowError { ArrowError::ParquetError(desc) => { write!(f, "Parquet argument error: {}", desc) } + ArrowError::CDataInterface(desc) => { + write!(f, "C Data interface error: {}", desc) + } ArrowError::DictionaryKeyOverflowError => { write!(f, "Dictionary key bigger than the key type") } diff --git a/rust/arrow/src/ffi.rs b/rust/arrow/src/ffi.rs new file mode 100644 index 0000000000000..22bf2a385b5c6 --- /dev/null +++ b/rust/arrow/src/ffi.rs @@ -0,0 +1,657 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains declarations to bind to the [C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html). +//! +//! Generally, this module is divided in two main interfaces: +//! One interface maps C ABI to native Rust types, i.e. convert c-pointers, c_char, to native rust. +//! This is handled by [FFI_ArrowSchema] and [FFI_ArrowArray]. +//! +//! The second interface maps native Rust types to the Rust-specific implementation of Arrow such as `format` to [Datatype], +//! `Buffer`, etc. This is handled by [ArrowArray]. +//! +//! ```rust +//! # use std::sync::Arc; +//! # use arrow::array::{Int32Array, Array, ArrayData, make_array_from_raw}; +//! # use arrow::error::{Result, ArrowError}; +//! # use arrow::compute::kernels::arithmetic; +//! # use std::convert::TryFrom; +//! # fn main() -> Result<()> { +//! // create an array natively +//! let array = Int32Array::from(vec![Some(1), None, Some(3)]); +//! +//! // export it +//! let (array_ptr, schema_ptr) = array.to_raw()?; +//! +//! // consumed and used by something else... +//! +//! // import it +//! let array = unsafe { make_array_from_raw(array_ptr, schema_ptr)? }; +//! +//! // perform some operation +//! let array = array.as_any().downcast_ref::().ok_or( +//! ArrowError::ParseError("Expects an int32".to_string()), +//! )?; +//! let array = arithmetic::add(&array, &array)?; +//! +//! // verify +//! assert_eq!(array, Int32Array::from(vec![Some(2), None, Some(6)])); +//! +//! // (drop/release) +//! Ok(()) +//! } +//! ``` + +/* +# Design: + +Main assumptions: +* A memory region is deallocated according it its own release mechanism. +* Rust shares memory regions between arrays. +* A memory region should be deallocated when no-one is using it. + +The design of this module is as follows: + +`ArrowArray` contains two `Arc`s, one per ABI-compatible `struct`, each containing data +according to the C Data Interface. These Arcs are used for ref counting of the structs +within Rust and lifetime management. + +Each ABI-compatible `struct` knowns how to `drop` itself, calling `release`. + +To import an array, unsafely create an `ArrowArray` from two pointers using [ArrowArray::try_from_raw]. +To export an array, create an `ArrowArray` using [ArrowArray::try_new]. +*/ + +use std::{ffi::CStr, ffi::CString, iter, mem::size_of, ptr, sync::Arc}; + +use crate::buffer::Buffer; +use crate::datatypes::DataType; +use crate::error::{ArrowError, Result}; +use crate::util::bit_util; + +/// ABI-compatible struct for `ArrowSchema` from C Data Interface +/// See https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions +/// This was created by bindgen +#[repr(C)] +#[derive(Debug)] +pub struct FFI_ArrowSchema { + format: *const ::std::os::raw::c_char, + name: *const ::std::os::raw::c_char, + metadata: *const ::std::os::raw::c_char, + flags: i64, + n_children: i64, + children: *mut *mut FFI_ArrowSchema, + dictionary: *mut FFI_ArrowSchema, + release: ::std::option::Option, + private_data: *mut ::std::os::raw::c_void, +} + +// callback used to drop [FFI_ArrowSchema] when it is exported. +unsafe extern "C" fn release_schema(schema: *mut FFI_ArrowSchema) { + let schema = &mut *schema; + + // take ownership back to release it. + CString::from_raw(schema.format as *mut std::os::raw::c_char); + + schema.release = None; +} + +impl FFI_ArrowSchema { + /// create a new [FFI_ArrowSchema] from a format. + fn new(format: &str) -> FFI_ArrowSchema { + // https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema + FFI_ArrowSchema { + format: CString::new(format).unwrap().into_raw(), + name: std::ptr::null_mut(), + metadata: std::ptr::null_mut(), + flags: 0, + n_children: 0, + children: ptr::null_mut(), + dictionary: std::ptr::null_mut(), + release: Some(release_schema), + private_data: std::ptr::null_mut(), + } + } + + /// create an empty [FFI_ArrowSchema] + fn empty() -> Self { + Self { + format: std::ptr::null_mut(), + name: std::ptr::null_mut(), + metadata: std::ptr::null_mut(), + flags: 0, + n_children: 0, + children: ptr::null_mut(), + dictionary: std::ptr::null_mut(), + release: None, + private_data: std::ptr::null_mut(), + } + } + + /// returns the format of this schema. + pub fn format(&self) -> &str { + unsafe { CStr::from_ptr(self.format) } + .to_str() + .expect("The external API has a non-utf8 as format") + } +} + +impl Drop for FFI_ArrowSchema { + fn drop(&mut self) { + match self.release { + None => (), + Some(release) => unsafe { release(self) }, + }; + } +} + +/// maps a DataType `format` to a [DataType](arrow::datatypes::DataType). +/// See https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings +fn to_datatype(format: &str) -> Result { + Ok(match format { + "n" => DataType::Null, + "b" => DataType::Boolean, + "c" => DataType::Int8, + "C" => DataType::UInt8, + "s" => DataType::Int16, + "S" => DataType::UInt16, + "i" => DataType::Int32, + "I" => DataType::UInt32, + "l" => DataType::Int64, + "L" => DataType::UInt64, + "e" => DataType::Float16, + "f" => DataType::Float32, + "g" => DataType::Float64, + "z" => DataType::Binary, + "Z" => DataType::LargeBinary, + "u" => DataType::Utf8, + "U" => DataType::LargeUtf8, + _ => { + return Err(ArrowError::CDataInterface( + "The datatype \"{}\" is still not supported in Rust implementation" + .to_string(), + )) + } + }) +} + +/// the inverse of [to_datatype] +fn from_datatype(datatype: &DataType) -> Result { + Ok(match datatype { + DataType::Null => "n", + DataType::Boolean => "b", + DataType::Int8 => "c", + DataType::UInt8 => "C", + DataType::Int16 => "s", + DataType::UInt16 => "S", + DataType::Int32 => "i", + DataType::UInt32 => "I", + DataType::Int64 => "l", + DataType::UInt64 => "L", + DataType::Float16 => "e", + DataType::Float32 => "f", + DataType::Float64 => "g", + DataType::Binary => "z", + DataType::LargeBinary => "Z", + DataType::Utf8 => "u", + DataType::LargeUtf8 => "U", + _ => { + return Err(ArrowError::CDataInterface( + "The datatype \"{:?}\" is still not supported in Rust implementation" + .to_string(), + )) + } + } + .to_string()) +} + +// returns the number of bits that buffer `i` (in the C data interface) is expected to have. +// This is set by the Arrow specification +fn bit_width(data_type: &DataType, i: usize) -> Result { + Ok(match (data_type, i) { + // the null buffer is bit sized + (_, 0) => 1, + // primitive types first buffer's size is given by the native types + (DataType::Boolean, 1) => 1, + (DataType::UInt8, 1) => size_of::() * 8, + (DataType::UInt16, 1) => size_of::() * 8, + (DataType::UInt32, 1) => size_of::() * 8, + (DataType::UInt64, 1) => size_of::() * 8, + (DataType::Int8, 1) => size_of::() * 8, + (DataType::Int16, 1) => size_of::() * 8, + (DataType::Int32, 1) => size_of::() * 8, + (DataType::Int64, 1) => size_of::() * 8, + (DataType::Float32, 1) => size_of::() * 8, + (DataType::Float64, 1) => size_of::() * 8, + // primitive types have a single buffer + (DataType::Boolean, _) | + (DataType::UInt8, _) | + (DataType::UInt16, _) | + (DataType::UInt32, _) | + (DataType::UInt64, _) | + (DataType::Int8, _) | + (DataType::Int16, _) | + (DataType::Int32, _) | + (DataType::Int64, _) | + (DataType::Float32, _) | + (DataType::Float64, _) => { + return Err(ArrowError::CDataInterface(format!( + "The datatype \"{:?}\" expects 2 buffers, but requested {}. Please verify that the C data interface is correctly implemented.", + data_type, i + ))) + } + // Variable-sized binaries: have two buffers. + // Utf8: first buffer is i32, second is in bytes + (DataType::Utf8, 1) => size_of::() * 8, + (DataType::Utf8, 2) => size_of::() * 8, + (DataType::Utf8, _) => { + return Err(ArrowError::CDataInterface(format!( + "The datatype \"{:?}\" expects 3 buffers, but requested {}. Please verify that the C data interface is correctly implemented.", + data_type, i + ))) + } + _ => { + return Err(ArrowError::CDataInterface(format!( + "The datatype \"{:?}\" is still not supported in Rust implementation", + data_type + ))) + } + }) +} + +/// ABI-compatible struct for ArrowArray from C Data Interface +/// See https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions +/// This was created by bindgen +#[repr(C)] +#[derive(Debug)] +pub struct FFI_ArrowArray { + pub(crate) length: i64, + pub(crate) null_count: i64, + pub(crate) offset: i64, + pub(crate) n_buffers: i64, + pub(crate) n_children: i64, + pub(crate) buffers: *mut *const ::std::os::raw::c_void, + children: *mut *mut FFI_ArrowArray, + dictionary: *mut FFI_ArrowArray, + release: ::std::option::Option, + // When exported, this MUST contain everything that is owned by this array. + // for example, any buffer pointed to in `buffers` must be here, as well as the `buffers` pointer + // itself. + // In other words, everything in [FFI_ArrowArray] must be owned by `private_data` and can assume + // that they do not outlive `private_data`. + private_data: *mut ::std::os::raw::c_void, +} + +// callback used to drop [FFI_ArrowArray] when it is exported +unsafe extern "C" fn release_array(array: *mut FFI_ArrowArray) { + if array.is_null() { + return; + } + let array = &mut *array; + // take ownership of `private_data`, therefore dropping it + Box::from_raw(array.private_data as *mut PrivateData); + + array.release = None; +} + +struct PrivateData { + buffers: Vec>, + buffers_ptr: Box<[*const std::os::raw::c_void]>, +} + +impl FFI_ArrowArray { + /// creates a new `FFI_ArrowArray` from existing data. + /// # Safety + /// This method releases `buffers`. Consumers of this struct *must* call `release` before + /// releasing this struct, or contents in `buffers` leak. + unsafe fn new( + length: i64, + null_count: i64, + offset: i64, + n_buffers: i64, + buffers: Vec>, + ) -> Self { + let buffers_ptr = buffers + .iter() + .map(|maybe_buffer| match maybe_buffer { + // note that `raw_data` takes into account the buffer's offset + Some(b) => b.raw_data() as *const std::os::raw::c_void, + None => std::ptr::null(), + }) + .collect::>(); + let pointer = buffers_ptr.as_ptr() as *mut *const std::ffi::c_void; + + // create the private data owning everything. + // any other data must be added here, e.g. via a struct, to track lifetime. + let private_data = Box::new(PrivateData { + buffers, + buffers_ptr, + }); + + Self { + length, + null_count, + offset, + n_buffers, + n_children: 0, + buffers: pointer, + children: std::ptr::null_mut(), + dictionary: std::ptr::null_mut(), + release: Some(release_array), + private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, + } + } + + // create an empty `FFI_ArrowArray`, which can be used to import data into + fn empty() -> Self { + Self { + length: 0, + null_count: 0, + offset: 0, + n_buffers: 0, + n_children: 0, + buffers: std::ptr::null_mut(), + children: std::ptr::null_mut(), + dictionary: std::ptr::null_mut(), + release: None, + private_data: std::ptr::null_mut(), + } + } +} + +/// returns a new buffer corresponding to the index `i` of the FFI array. It may not exist (null pointer). +/// `bits` is the number of bits that the native type of this buffer has. +/// The size of the buffer will be `ceil(self.length * bits, 8)`. +/// # Panic +/// This function panics if `i` is larger or equal to `n_buffers`. +/// # Safety +/// This function assumes that `ceil(self.length * bits, 8)` is the size of the buffer +unsafe fn create_buffer( + array: Arc, + index: usize, + len: usize, +) -> Option { + if array.buffers.is_null() { + return None; + } + let buffers = array.buffers as *mut *const u8; + + assert!(index < array.n_buffers as usize); + let ptr = *buffers.add(index); + + if ptr.is_null() { + None + } else { + Some(Buffer::from_unowned(ptr, len, array)) + } +} + +impl Drop for FFI_ArrowArray { + fn drop(&mut self) { + match self.release { + None => (), + Some(release) => unsafe { release(self) }, + }; + } +} + +/// Struct used to move an Array from and to the C Data Interface. +/// Its main responsibility is to expose functionality that requires +/// both [FFI_ArrowArray] and [FFI_ArrowSchema]. +/// +/// This struct has two main paths: +/// +/// ## Import from the C Data Interface +/// * [ArrowArray::empty] to allocate memory to be filled by an external call +/// * [ArrowArray::try_from_raw] to consume two non-null allocated pointers +/// ## Export to the C Data Interface +/// * [ArrowArray::try_new] to create a new [ArrowArray] from Rust-specific information +/// * [ArrowArray::into_raw] to expose two pointers for [FFI_ArrowArray] and [FFI_ArrowSchema]. +/// +/// # Safety +/// Whoever creates this struct is responsible for releasing their resources. Specifically, +/// consumers *must* call [ArrowArray::into_raw] and take ownership of the individual pointers, +/// calling [FFI_ArrowArray::release] and [FFI_ArrowSchema::release] accordingly. +/// +/// Furthermore, this struct assumes that the incoming data agrees with the C data interface. +#[derive(Debug)] +pub struct ArrowArray { + // these are ref-counted because they can be shared by multiple buffers. + array: Arc, + schema: Arc, +} + +impl ArrowArray { + /// creates a new `ArrowArray`. This is used to export to the C Data Interface. + /// # Safety + /// See safety of [ArrowArray] + pub unsafe fn try_new( + data_type: &DataType, + len: usize, + null_count: usize, + null_buffer: Option, + offset: usize, + buffers: Vec, + _child_data: Vec, + ) -> Result { + let format = from_datatype(data_type)?; + // * insert the null buffer at the start + // * make all others `Option`. + let new_buffers = iter::once(null_buffer) + .chain(buffers.iter().map(|b| Some(b.clone()))) + .collect::>(); + + let schema = Arc::new(FFI_ArrowSchema::new(&format)); + let array = Arc::new(FFI_ArrowArray::new( + len as i64, + null_count as i64, + offset as i64, + new_buffers.len() as i64, + new_buffers, + )); + + Ok(ArrowArray { schema, array }) + } + + /// creates a new [ArrowArray] from two pointers. Used to import from the C Data Interface. + /// # Safety + /// See safety of [ArrowArray] + /// # Error + /// Errors if any of the pointers is null + pub unsafe fn try_from_raw( + array: *const FFI_ArrowArray, + schema: *const FFI_ArrowSchema, + ) -> Result { + if array.is_null() || schema.is_null() { + return Err(ArrowError::MemoryError( + "At least one of the pointers passed to `try_from_raw` is null" + .to_string(), + )); + }; + Ok(Self { + array: Arc::from_raw(array as *mut FFI_ArrowArray), + schema: Arc::from_raw(schema as *mut FFI_ArrowSchema), + }) + } + + /// creates a new empty [ArrowArray]. Used to import from the C Data Interface. + /// # Safety + /// See safety of [ArrowArray] + pub unsafe fn empty() -> Self { + let schema = Arc::new(FFI_ArrowSchema::empty()); + let array = Arc::new(FFI_ArrowArray::empty()); + ArrowArray { schema, array } + } + + /// exports [ArrowArray] to the C Data Interface + pub fn into_raw(this: ArrowArray) -> (*const FFI_ArrowArray, *const FFI_ArrowSchema) { + (Arc::into_raw(this.array), Arc::into_raw(this.schema)) + } + + /// returns the null bit buffer. + /// Rust implementation uses a buffer that is not part of the array of buffers. + /// The C Data interface's null buffer is part of the array of buffers. + pub fn null_bit_buffer(&self) -> Option { + // similar to `self.buffer_len(0)`, but without `Result`. + let buffer_len = bit_util::ceil(self.array.length as usize, 8); + + unsafe { create_buffer(self.array.clone(), 0, buffer_len) } + } + + /// Returns the length, in bytes, of the buffer `i` (indexed according to the C data interface) + // Rust implementation uses fixed-sized buffers, which require knowledge of their `len`. + // for variable-sized buffers, such as the second buffer of a stringArray, we need + // to fetch offset buffer's len to build the second buffer. + fn buffer_len(&self, i: usize) -> Result { + let data_type = &self.data_type()?; + + Ok(match (data_type, i) { + (DataType::Utf8, 1) => { + // the len of the offset buffer (buffer 1) equals length + 1 + let bits = bit_width(data_type, i)?; + bit_util::ceil((self.array.length as usize + 1) * bits, 8) + } + (DataType::Utf8, 2) => { + // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) + let len = self.buffer_len(1)?; + // first buffer is the null buffer => add(1) + // we assume that pointer is aligned for `i32`, as Utf8 uses `i32` offsets. + #[allow(clippy::cast_ptr_alignment)] + let offset_buffer = unsafe { + *(self.array.buffers as *mut *const u8).add(1) as *const i32 + }; + // get last offset + (unsafe { *offset_buffer.add(len / size_of::() - 1) }) as usize + } + // buffer len of primitive types + _ => { + let bits = bit_width(data_type, i)?; + bit_util::ceil(self.array.length as usize * bits, 8) + } + }) + } + + /// returns all buffers, as organized by Rust (i.e. null buffer is skipped) + pub fn buffers(&self) -> Result> { + (0..self.array.n_buffers - 1) + .map(|index| { + // + 1: skip null buffer + let index = (index + 1) as usize; + + let len = self.buffer_len(index)?; + + unsafe { create_buffer(self.array.clone(), index, len) }.ok_or_else( + || { + ArrowError::CDataInterface(format!( + "The external buffer at position {} is null.", + index - 1 + )) + }, + ) + }) + .collect() + } + + /// the length of the array + pub fn len(&self) -> usize { + self.array.length as usize + } + + /// whether the array is empty + pub fn is_empty(&self) -> bool { + self.array.length == 0 + } + + /// the offset of the array + pub fn offset(&self) -> usize { + self.array.offset as usize + } + + /// the null count of the array + pub fn null_count(&self) -> usize { + self.array.null_count as usize + } + + /// the data_type as declared in the schema + pub fn data_type(&self) -> Result { + to_datatype(self.schema.format()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::array::{make_array, Array, ArrayData, Int32Array, StringArray}; + use crate::compute::kernels; + use std::convert::TryFrom; + use std::sync::Arc; + + #[test] + fn test_round_trip() -> Result<()> { + // create an array natively + let array = Int32Array::from(vec![1, 2, 3]); + + // export it + let array = ArrowArray::try_from(array.data().as_ref().clone())?; + + // (simulate consumer) import it + let data = Arc::new(ArrayData::try_from(array)?); + let array = make_array(data); + + // perform some operation + let array = array.as_any().downcast_ref::().unwrap(); + let array = kernels::arithmetic::add(&array, &array).unwrap(); + + // verify + assert_eq!(array, Int32Array::from(vec![2, 4, 6])); + + // (drop/release) + Ok(()) + } + // case with nulls is tested in the docs, through the example on this module. + + #[test] + fn test_string() -> Result<()> { + // create an array natively + let array = StringArray::from(vec![Some("a"), None, Some("aaa")]); + + // export it + let array = ArrowArray::try_from(array.data().as_ref().clone())?; + + // (simulate consumer) import it + let data = Arc::new(ArrayData::try_from(array)?); + let array = make_array(data); + + // perform some operation + let array = kernels::concat::concat(&[array.clone(), array]).unwrap(); + let array = array.as_any().downcast_ref::().unwrap(); + + // verify + let expected = StringArray::from(vec![ + Some("a"), + None, + Some("aaa"), + Some("a"), + None, + Some("aaa"), + ]); + assert_eq!(array, &expected); + + // (drop/release) + Ok(()) + } +} diff --git a/rust/arrow/src/lib.rs b/rust/arrow/src/lib.rs index 1355c7201ce79..9c91d38566fd2 100644 --- a/rust/arrow/src/lib.rs +++ b/rust/arrow/src/lib.rs @@ -139,13 +139,16 @@ mod arch; pub mod array; pub mod bitmap; pub mod buffer; +pub mod bytes; pub mod compute; pub mod csv; pub mod datatypes; pub mod error; +pub mod ffi; pub mod ipc; pub mod json; pub mod memory; pub mod record_batch; pub mod tensor; pub mod util; +mod zz_memory_check; diff --git a/rust/arrow/src/memory.rs b/rust/arrow/src/memory.rs index 802498abd22c2..8bd334469ee44 100644 --- a/rust/arrow/src/memory.rs +++ b/rust/arrow/src/memory.rs @@ -18,9 +18,9 @@ //! Defines memory-related functions, such as allocate/deallocate/reallocate memory //! regions, cache and allocation alignments. -use std::alloc::Layout; use std::mem::align_of; use std::ptr::NonNull; +use std::{alloc::Layout, sync::atomic::AtomicIsize}; // NOTE: Below code is written for spatial/temporal prefetcher optimizations. Memory allocation // should align well with usage pattern of cache access and block sizes on layers of storage levels from @@ -135,6 +135,9 @@ const FALLBACK_ALIGNMENT: usize = 1 << 6; /// If you use allocation methods shown here you won't have any problems. const BYPASS_PTR: NonNull = unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }; +// If this number is not zero after all objects have been `drop`, there is a memory leak +pub static mut ALLOCATIONS: AtomicIsize = AtomicIsize::new(0); + pub fn allocate_aligned(size: usize) -> *mut u8 { unsafe { if size == 0 { @@ -143,6 +146,8 @@ pub fn allocate_aligned(size: usize) -> *mut u8 { // This will dodge allocator api for any type. BYPASS_PTR.as_ptr() } else { + ALLOCATIONS.fetch_add(size as isize, std::sync::atomic::Ordering::SeqCst); + let layout = Layout::from_size_align_unchecked(size, ALIGNMENT); std::alloc::alloc_zeroed(layout) } @@ -159,6 +164,7 @@ pub fn allocate_aligned(size: usize) -> *mut u8 { /// * size must be the same size that was used to allocate that block of memory, pub unsafe fn free_aligned(ptr: *mut u8, size: usize) { if ptr != BYPASS_PTR.as_ptr() { + ALLOCATIONS.fetch_sub(size as isize, std::sync::atomic::Ordering::SeqCst); std::alloc::dealloc(ptr, Layout::from_size_align_unchecked(size, ALIGNMENT)); } } @@ -184,6 +190,10 @@ pub unsafe fn reallocate(ptr: *mut u8, old_size: usize, new_size: usize) -> *mut return BYPASS_PTR.as_ptr(); } + ALLOCATIONS.fetch_add( + new_size as isize - old_size as isize, + std::sync::atomic::Ordering::SeqCst, + ); let new_ptr = std::alloc::realloc( ptr, Layout::from_size_align_unchecked(old_size, ALIGNMENT), @@ -241,6 +251,7 @@ mod tests { let p = allocate_aligned(1024); // make sure this is 64-byte aligned assert_eq!(0, (p as usize) % 64); + unsafe { free_aligned(p, 1024) }; } } @@ -257,5 +268,6 @@ mod tests { assert_eq!(true, is_aligned::(ptr, 1)); assert_eq!(false, is_aligned::(ptr, 2)); assert_eq!(false, is_aligned::(ptr, 4)); + unsafe { free_aligned(ptr.offset(-1), 10) }; } } diff --git a/rust/arrow/src/zz_memory_check.rs b/rust/arrow/src/zz_memory_check.rs new file mode 100644 index 0000000000000..70ec8ebdbdd2f --- /dev/null +++ b/rust/arrow/src/zz_memory_check.rs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This file is named like this so that it is the last one to be tested +// It contains no content, it has a single test that verifies that there is no memory leak +// on all unit-tests + +#[cfg(feature = "memory-check")] +mod tests { + use crate::memory::ALLOCATIONS; + + // verify that there is no data un-allocated + #[test] + fn test_memory_check() { + unsafe { assert_eq!(ALLOCATIONS.load(std::sync::atomic::Ordering::SeqCst), 0) } + } +}