From c89cfafb16333b363428c2cfc3a97f8f505a4194 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 19 Dec 2016 22:33:34 -0500 Subject: [PATCH 1/7] Rearrange to-pandas deserialization to better permit reads into pre-allocated memory Change-Id: I9d0c56e907751c85661c95fc9349aef9651679d8 --- cpp/src/.clang-format => .clang-format | 0 cpp/src/.clang-tidy => .clang-tidy | 0 .../.clang-tidy-ignore => .clang-tidy-ignore | 0 cpp/CMakeLists.txt | 3 +- python/src/pyarrow/adapters/builtin.cc | 42 +- python/src/pyarrow/adapters/builtin.h | 4 +- python/src/pyarrow/adapters/pandas.cc | 642 +++++++++--------- python/src/pyarrow/adapters/pandas.h | 33 +- python/src/pyarrow/api.h | 2 +- python/src/pyarrow/common.cc | 4 +- python/src/pyarrow/common.h | 52 +- python/src/pyarrow/config.cc | 5 +- python/src/pyarrow/config.h | 6 +- python/src/pyarrow/helpers.cc | 3 +- python/src/pyarrow/helpers.h | 4 +- python/src/pyarrow/io.cc | 14 +- python/src/pyarrow/io.h | 6 +- python/src/pyarrow/numpy_interop.h | 4 +- python/src/pyarrow/util/datetime.h | 8 +- python/src/pyarrow/util/test_main.cc | 2 +- 20 files changed, 425 insertions(+), 409 deletions(-) rename cpp/src/.clang-format => .clang-format (100%) rename cpp/src/.clang-tidy => .clang-tidy (100%) rename cpp/src/.clang-tidy-ignore => .clang-tidy-ignore (100%) diff --git a/cpp/src/.clang-format b/.clang-format similarity index 100% rename from cpp/src/.clang-format rename to .clang-format diff --git a/cpp/src/.clang-tidy b/.clang-tidy similarity index 100% rename from cpp/src/.clang-tidy rename to .clang-tidy diff --git a/cpp/src/.clang-tidy-ignore b/.clang-tidy-ignore similarity index 100% rename from cpp/src/.clang-tidy-ignore rename to .clang-tidy-ignore diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 315995ce7cb..93e9853df89 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -741,7 +741,8 @@ endif (UNIX) if (${CLANG_FORMAT_FOUND}) # runs clang format and updates files in place. add_custom_target(format ${BUILD_SUPPORT_DIR}/run-clang-format.sh ${CMAKE_CURRENT_SOURCE_DIR} ${CLANG_FORMAT_BIN} 1 - `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | sed -e '/_generated/g'`) + `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | sed -e '/_generated/g'` + `find ${CMAKE_CURRENT_SOURCE_DIR}/../python -name \\*.cc -or -name \\*.h`) # runs clang format and exits with a non-zero exit code if any files need to be reformatted add_custom_target(check-format ${BUILD_SUPPORT_DIR}/run-clang-format.sh ${CMAKE_CURRENT_SOURCE_DIR} ${CLANG_FORMAT_BIN} 0 diff --git a/python/src/pyarrow/adapters/builtin.cc b/python/src/pyarrow/adapters/builtin.cc index 2a13944b35c..1567be95e2a 100644 --- a/python/src/pyarrow/adapters/builtin.cc +++ b/python/src/pyarrow/adapters/builtin.cc @@ -100,9 +100,7 @@ class ScalarVisitor { } } - int64_t total_count() const { - return total_count_; - } + int64_t total_count() const { return total_count_; } private: int64_t total_count_; @@ -123,17 +121,14 @@ static constexpr int MAX_NESTING_LEVELS = 32; class SeqVisitor { public: - SeqVisitor() : - max_nesting_level_(0) { + SeqVisitor() : max_nesting_level_(0) { memset(nesting_histogram_, 0, MAX_NESTING_LEVELS * sizeof(int)); } - Status Visit(PyObject* obj, int level=0) { + Status Visit(PyObject* obj, int level = 0) { Py_ssize_t size = PySequence_Size(obj); - if (level > max_nesting_level_) { - max_nesting_level_ = level; - } + if (level > max_nesting_level_) { max_nesting_level_ = level; } for (int64_t i = 0; i < size; ++i) { // TODO(wesm): Error checking? @@ -188,9 +183,7 @@ class SeqVisitor { int max_observed_level() const { int result = 0; for (int i = 0; i < MAX_NESTING_LEVELS; ++i) { - if (nesting_histogram_[i] > 0) { - result = i; - } + if (nesting_histogram_[i] > 0) { result = i; } } return result; } @@ -198,9 +191,7 @@ class SeqVisitor { int num_nesting_levels() const { int result = 0; for (int i = 0; i < MAX_NESTING_LEVELS; ++i) { - if (nesting_histogram_[i] > 0) { - ++result; - } + if (nesting_histogram_[i] > 0) { ++result; } } return result; } @@ -214,8 +205,8 @@ class SeqVisitor { }; // Non-exhaustive type inference -static Status InferArrowType(PyObject* obj, int64_t* size, - std::shared_ptr* out_type) { +static Status InferArrowType( + PyObject* obj, int64_t* size, std::shared_ptr* out_type) { *size = PySequence_Size(obj); if (PyErr_Occurred()) { // Not a sequence @@ -234,9 +225,7 @@ static Status InferArrowType(PyObject* obj, int64_t* size, *out_type = seq_visitor.GetType(); - if (*out_type == nullptr) { - return Status::TypeError("Unable to determine data type"); - } + if (*out_type == nullptr) { return Status::TypeError("Unable to determine data type"); } return Status::OK(); } @@ -337,7 +326,8 @@ class TimestampConverter : public TypedConverter { if (item.obj() == Py_None) { typed_builder_->AppendNull(); } else { - PyDateTime_DateTime* pydatetime = reinterpret_cast(item.obj()); + PyDateTime_DateTime* pydatetime = + reinterpret_cast(item.obj()); struct tm datetime = {0}; datetime.tm_year = PyDateTime_GET_YEAR(pydatetime) - 1900; datetime.tm_mon = PyDateTime_GET_MONTH(pydatetime) - 1; @@ -462,6 +452,7 @@ class ListConverter : public TypedConverter { } return Status::OK(); } + protected: std::shared_ptr value_converter_; }; @@ -496,8 +487,8 @@ Status ListConverter::Init(const std::shared_ptr& builder) { builder_ = builder; typed_builder_ = static_cast(builder.get()); - value_converter_ = GetConverter(static_cast( - builder->type().get())->value_type()); + value_converter_ = + GetConverter(static_cast(builder->type().get())->value_type()); if (value_converter_ == nullptr) { return Status::NotImplemented("value type not implemented"); } @@ -521,8 +512,7 @@ Status ConvertPySequence(PyObject* obj, std::shared_ptr* out) { std::shared_ptr converter = GetConverter(type); if (converter == nullptr) { std::stringstream ss; - ss << "No type converter implemented for " - << type->ToString(); + ss << "No type converter implemented for " << type->ToString(); return Status::NotImplemented(ss.str()); } @@ -536,4 +526,4 @@ Status ConvertPySequence(PyObject* obj, std::shared_ptr* out) { return builder->Finish(out); } -} // namespace pyarrow +} // namespace pyarrow diff --git a/python/src/pyarrow/adapters/builtin.h b/python/src/pyarrow/adapters/builtin.h index 2ddfdaaf441..1ff36945c88 100644 --- a/python/src/pyarrow/adapters/builtin.h +++ b/python/src/pyarrow/adapters/builtin.h @@ -40,6 +40,6 @@ namespace pyarrow { PYARROW_EXPORT arrow::Status ConvertPySequence(PyObject* obj, std::shared_ptr* out); -} // namespace pyarrow +} // namespace pyarrow -#endif // PYARROW_ADAPTERS_BUILTIN_H +#endif // PYARROW_ADAPTERS_BUILTIN_H diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 38f3b6f5248..5e27a43455a 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -30,8 +30,9 @@ #include #include "arrow/api.h" -#include "arrow/util/bit-util.h" #include "arrow/status.h" +#include "arrow/type_fwd.h" +#include "arrow/util/bit-util.h" #include "pyarrow/common.h" #include "pyarrow/config.h" @@ -44,6 +45,7 @@ using arrow::Column; using arrow::Field; using arrow::DataType; using arrow::Status; +using arrow::Table; namespace BitUtil = arrow::BitUtil; @@ -51,8 +53,7 @@ namespace BitUtil = arrow::BitUtil; // Serialization template -struct npy_traits { -}; +struct npy_traits {}; template <> struct npy_traits { @@ -60,21 +61,17 @@ struct npy_traits { using TypeClass = arrow::BooleanType; static constexpr bool supports_nulls = false; - static inline bool isnull(uint8_t v) { - return false; - } + static inline bool isnull(uint8_t v) { return false; } }; -#define NPY_INT_DECL(TYPE, CapType, T) \ - template <> \ - struct npy_traits { \ - typedef T value_type; \ - using TypeClass = arrow::CapType##Type; \ - \ - static constexpr bool supports_nulls = false; \ - static inline bool isnull(T v) { \ - return false; \ - } \ +#define NPY_INT_DECL(TYPE, CapType, T) \ + template <> \ + struct npy_traits { \ + typedef T value_type; \ + using TypeClass = arrow::CapType##Type; \ + \ + static constexpr bool supports_nulls = false; \ + static inline bool isnull(T v) { return false; } \ }; NPY_INT_DECL(INT8, Int8, int8_t); @@ -93,9 +90,7 @@ struct npy_traits { static constexpr bool supports_nulls = true; - static inline bool isnull(float v) { - return v != v; - } + static inline bool isnull(float v) { return v != v; } }; template <> @@ -105,9 +100,7 @@ struct npy_traits { static constexpr bool supports_nulls = true; - static inline bool isnull(double v) { - return v != v; - } + static inline bool isnull(double v) { return v != v; } }; template <> @@ -135,18 +128,14 @@ struct npy_traits { template class ArrowSerializer { public: - ArrowSerializer(arrow::MemoryPool* pool, PyArrayObject* arr, PyArrayObject* mask) : - pool_(pool), - arr_(arr), - mask_(mask) { + ArrowSerializer(arrow::MemoryPool* pool, PyArrayObject* arr, PyArrayObject* mask) + : pool_(pool), arr_(arr), mask_(mask) { length_ = PyArray_SIZE(arr_); } Status Convert(std::shared_ptr* out); - int stride() const { - return PyArray_STRIDES(arr_)[0]; - } + int stride() const { return PyArray_STRIDES(arr_)[0]; } Status InitNullBitmap() { int null_bytes = BitUtil::BytesForBits(length_); @@ -215,9 +204,7 @@ class ArrowSerializer { const int32_t length = PyBytes_GET_SIZE(obj); s = string_builder.Append(PyBytes_AS_STRING(obj), length); Py_DECREF(obj); - if (!s.ok()) { - return s; - } + if (!s.ok()) { return s; } } else if (PyBytes_Check(obj)) { have_bytes = true; const int32_t length = PyBytes_GET_SIZE(obj); @@ -259,8 +246,7 @@ class ArrowSerializer { } } - *out = std::make_shared(length_, data, null_count, - null_bitmap_); + *out = std::make_shared(length_, data, null_count, null_bitmap_); return Status::OK(); } @@ -321,26 +307,27 @@ inline Status ArrowSerializer::MakeDataType(std::shared_ptr* out } template <> -inline Status ArrowSerializer::MakeDataType(std::shared_ptr* out) { +inline Status ArrowSerializer::MakeDataType( + std::shared_ptr* out) { PyArray_Descr* descr = PyArray_DESCR(arr_); auto date_dtype = reinterpret_cast(descr->c_metadata); arrow::TimestampType::Unit unit; switch (date_dtype->meta.base) { - case NPY_FR_s: - unit = arrow::TimestampType::Unit::SECOND; - break; - case NPY_FR_ms: - unit = arrow::TimestampType::Unit::MILLI; - break; - case NPY_FR_us: - unit = arrow::TimestampType::Unit::MICRO; - break; - case NPY_FR_ns: - unit = arrow::TimestampType::Unit::NANO; - break; - default: - return Status::Invalid("Unknown NumPy datetime unit"); + case NPY_FR_s: + unit = arrow::TimestampType::Unit::SECOND; + break; + case NPY_FR_ms: + unit = arrow::TimestampType::Unit::MILLI; + break; + case NPY_FR_us: + unit = arrow::TimestampType::Unit::MICRO; + break; + case NPY_FR_ns: + unit = arrow::TimestampType::Unit::NANO; + break; + default: + return Status::Invalid("Unknown NumPy datetime unit"); } out->reset(new arrow::TimestampType(unit)); @@ -351,9 +338,7 @@ template inline Status ArrowSerializer::Convert(std::shared_ptr* out) { typedef npy_traits traits; - if (mask_ != nullptr || traits::supports_nulls) { - RETURN_NOT_OK(InitNullBitmap()); - } + if (mask_ != nullptr || traits::supports_nulls) { RETURN_NOT_OK(InitNullBitmap()); } int64_t null_count = 0; if (mask_ != nullptr) { @@ -429,9 +414,7 @@ inline Status ArrowSerializer::Convert(std::shared_ptr* out) template inline Status ArrowSerializer::ConvertData() { // TODO(wesm): strided arrays - if (is_strided()) { - return Status::Invalid("no support for strided data yet"); - } + if (is_strided()) { return Status::Invalid("no support for strided data yet"); } data_ = std::make_shared(arr_); return Status::OK(); @@ -439,9 +422,7 @@ inline Status ArrowSerializer::ConvertData() { template <> inline Status ArrowSerializer::ConvertData() { - if (is_strided()) { - return Status::Invalid("no support for strided data yet"); - } + if (is_strided()) { return Status::Invalid("no support for strided data yet"); } int nbytes = BitUtil::BytesForBits(length_); auto buffer = std::make_shared(pool_); @@ -453,9 +434,7 @@ inline Status ArrowSerializer::ConvertData() { memset(bitmap, 0, nbytes); for (int i = 0; i < length_; ++i) { - if (values[i] > 0) { - BitUtil::SetBit(bitmap, i); - } + if (values[i] > 0) { BitUtil::SetBit(bitmap, i); } } data_ = buffer; @@ -468,29 +447,24 @@ inline Status ArrowSerializer::ConvertData() { return Status::TypeError("NYI"); } +#define TO_ARROW_CASE(TYPE) \ + case NPY_##TYPE: { \ + ArrowSerializer converter(pool, arr, mask); \ + RETURN_NOT_OK(converter.Convert(out)); \ + } break; -#define TO_ARROW_CASE(TYPE) \ - case NPY_##TYPE: \ - { \ - ArrowSerializer converter(pool, arr, mask); \ - RETURN_NOT_OK(converter.Convert(out)); \ - } \ - break; - -Status PandasMaskedToArrow(arrow::MemoryPool* pool, PyObject* ao, PyObject* mo, - std::shared_ptr* out) { +Status PandasMaskedToArrow( + arrow::MemoryPool* pool, PyObject* ao, PyObject* mo, std::shared_ptr* out) { PyArrayObject* arr = reinterpret_cast(ao); PyArrayObject* mask = nullptr; - if (mo != nullptr) { - mask = reinterpret_cast(mo); - } + if (mo != nullptr) { mask = reinterpret_cast(mo); } if (PyArray_NDIM(arr) != 1) { return Status::Invalid("only handle 1-dimensional arrays"); } - switch(PyArray_DESCR(arr)->type_num) { + switch (PyArray_DESCR(arr)->type_num) { TO_ARROW_CASE(BOOL); TO_ARROW_CASE(INT8); TO_ARROW_CASE(INT16); @@ -506,15 +480,13 @@ Status PandasMaskedToArrow(arrow::MemoryPool* pool, PyObject* ao, PyObject* mo, TO_ARROW_CASE(OBJECT); default: std::stringstream ss; - ss << "unsupported type " << PyArray_DESCR(arr)->type_num - << std::endl; + ss << "unsupported type " << PyArray_DESCR(arr)->type_num << std::endl; return Status::NotImplemented(ss.str()); } return Status::OK(); } -Status PandasToArrow(arrow::MemoryPool* pool, PyObject* ao, - std::shared_ptr* out) { +Status PandasToArrow(arrow::MemoryPool* pool, PyObject* ao, std::shared_ptr* out) { return PandasMaskedToArrow(pool, ao, nullptr, out); } @@ -522,28 +494,27 @@ Status PandasToArrow(arrow::MemoryPool* pool, PyObject* ao, // Deserialization template -struct arrow_traits { -}; +struct arrow_traits {}; template <> struct arrow_traits { static constexpr int npy_type = NPY_BOOL; static constexpr bool supports_nulls = false; static constexpr bool is_boolean = true; - static constexpr bool is_pandas_numeric_not_nullable = false; - static constexpr bool is_pandas_numeric_nullable = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = false; }; -#define INT_DECL(TYPE) \ - template <> \ - struct arrow_traits { \ - static constexpr int npy_type = NPY_##TYPE; \ - static constexpr bool supports_nulls = false; \ - static constexpr double na_value = NAN; \ - static constexpr bool is_boolean = false; \ - static constexpr bool is_pandas_numeric_not_nullable = true; \ - static constexpr bool is_pandas_numeric_nullable = false; \ - typedef typename npy_traits::value_type T; \ +#define INT_DECL(TYPE) \ + template <> \ + struct arrow_traits { \ + static constexpr int npy_type = NPY_##TYPE; \ + static constexpr bool supports_nulls = false; \ + static constexpr double na_value = NAN; \ + static constexpr bool is_boolean = false; \ + static constexpr bool is_numeric_not_nullable = true; \ + static constexpr bool is_numeric_nullable = false; \ + typedef typename npy_traits::value_type T; \ }; INT_DECL(INT8); @@ -561,8 +532,8 @@ struct arrow_traits { static constexpr bool supports_nulls = true; static constexpr float na_value = NAN; static constexpr bool is_boolean = false; - static constexpr bool is_pandas_numeric_not_nullable = false; - static constexpr bool is_pandas_numeric_nullable = true; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; typedef typename npy_traits::value_type T; }; @@ -572,8 +543,8 @@ struct arrow_traits { static constexpr bool supports_nulls = true; static constexpr double na_value = NAN; static constexpr bool is_boolean = false; - static constexpr bool is_pandas_numeric_not_nullable = false; - static constexpr bool is_pandas_numeric_nullable = true; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; typedef typename npy_traits::value_type T; }; @@ -583,8 +554,8 @@ struct arrow_traits { static constexpr bool supports_nulls = true; static constexpr int64_t na_value = std::numeric_limits::min(); static constexpr bool is_boolean = false; - static constexpr bool is_pandas_numeric_not_nullable = false; - static constexpr bool is_pandas_numeric_nullable = true; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; typedef typename npy_traits::value_type T; }; @@ -594,8 +565,8 @@ struct arrow_traits { static constexpr bool supports_nulls = true; static constexpr int64_t na_value = std::numeric_limits::min(); static constexpr bool is_boolean = false; - static constexpr bool is_pandas_numeric_not_nullable = false; - static constexpr bool is_pandas_numeric_nullable = true; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; typedef typename npy_traits::value_type T; }; @@ -604,11 +575,10 @@ struct arrow_traits { static constexpr int npy_type = NPY_OBJECT; static constexpr bool supports_nulls = true; static constexpr bool is_boolean = false; - static constexpr bool is_pandas_numeric_not_nullable = false; - static constexpr bool is_pandas_numeric_nullable = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = false; }; - static inline PyObject* make_pystring(const uint8_t* data, int32_t length) { #if PY_MAJOR_VERSION >= 3 return PyUnicode_FromStringAndSize(reinterpret_cast(data), length); @@ -645,20 +615,10 @@ inline void set_numpy_metadata(int type, DataType* datatype, PyArrayObject* out) } } -template class ArrowDeserializer { public: - ArrowDeserializer(const std::shared_ptr& col, PyObject* py_ref) : - col_(col), py_ref_(py_ref) {} - - Status Convert(PyObject** out) { - const std::shared_ptr data = col_->data(); - - RETURN_NOT_OK(ConvertValues(data)); - *out = reinterpret_cast(out_); - - return Status::OK(); - } + ArrowDeserializer(const std::shared_ptr& col, PyObject* py_ref) + : col_(col), data_(col->data()), py_ref_(py_ref) {} Status AllocateOutput(int type) { PyAcquireGIL lock; @@ -676,20 +636,29 @@ class ArrowDeserializer { return Status::OK(); } - Status OutputFromData(int type, void* data) { + template + Status ConvertValuesZeroCopy(int npy_type, std::shared_ptr arr) { + typedef typename arrow_traits::T T; + + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + + // Zero-Copy. We can pass the data pointer directly to NumPy. + void* data = const_cast(in_values); + PyAcquireGIL lock; // Zero-Copy. We can pass the data pointer directly to NumPy. npy_intp dims[1] = {col_->length()}; - out_ = reinterpret_cast(PyArray_SimpleNewFromData(1, dims, - type, data)); + out_ = + reinterpret_cast(PyArray_SimpleNewFromData(1, dims, npy_type, data)); if (out_ == NULL) { // Error occurred, trust that SimpleNew set the error state return Status::OK(); } - set_numpy_metadata(type, col_->type().get(), out_); + set_numpy_metadata(npy_type, col_->type().get(), out_); if (PyArray_SetBaseObject(out_, py_ref_) == -1) { // Error occurred, trust that SetBaseObject set the error state @@ -705,222 +674,294 @@ class ArrowDeserializer { return Status::OK(); } - template - Status ConvertValuesZeroCopy(std::shared_ptr arr) { - typedef typename arrow_traits::T T; + template + void ConvertIntegerWithNulls(double* out_values) { + for (int c = 0; c < data_->num_chunks(); c++) { + const std::shared_ptr arr = data_->chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + // Upcast to double, set NaN as appropriate - auto prim_arr = static_cast(arr.get()); - auto in_values = reinterpret_cast(prim_arr->data()->data()); + for (int i = 0; i < arr->length(); ++i) { + *out_values++ = prim_arr->IsNull(i) ? NAN : in_values[i]; + } + } + } - // Zero-Copy. We can pass the data pointer directly to NumPy. - void* data = const_cast(in_values); - int type = arrow_traits::npy_type; - RETURN_NOT_OK(OutputFromData(type, data)); + template + void ConvertIntegerNoNullsSameType(T* out_values) { + for (int c = 0; c < data_->num_chunks(); c++) { + const std::shared_ptr arr = data_->chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + memcpy(out_values, in_values, sizeof(T) * arr->length()); + out_values += arr->length(); + } + } + + template + void ConvertIntegerNoNullsCast(OutType* out_values) { + for (int c = 0; c < data_->num_chunks(); c++) { + const std::shared_ptr arr = data_->chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + for (int32_t i = 0; i < arr->length(); ++i) { + *out_values = in_values[i]; + } + } + } + + Status ConvertBooleanWithNulls(PyObject** out_values) { + PyAcquireGIL lock; + for (int c = 0; c < data_->num_chunks(); c++) { + const std::shared_ptr arr = data_->chunk(c); + auto bool_arr = static_cast(arr.get()); + + for (int64_t i = 0; i < arr->length(); ++i) { + if (bool_arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values++ = Py_None; + } else if (bool_arr->Value(i)) { + // True + Py_INCREF(Py_True); + *out_values++ = Py_True; + } else { + // False + Py_INCREF(Py_False); + *out_values++ = Py_False; + } + } + } + return Status::OK(); + } + + Status ConvertBooleanNoNulls(uint8_t* out_values) { + for (int c = 0; c < data_->num_chunks(); c++) { + const std::shared_ptr arr = data_->chunk(c); + auto bool_arr = static_cast(arr.get()); + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = static_cast(bool_arr->Value(i)); + } + } return Status::OK(); } - template - inline typename std::enable_if< - (T2 != arrow::Type::DATE) & arrow_traits::is_pandas_numeric_nullable, Status>::type - ConvertValues(const std::shared_ptr& data) { - typedef typename arrow_traits::T T; - size_t chunk_offset = 0; + Status ConvertStrings(PyObject** out_values) { + PyAcquireGIL lock; + for (int c = 0; c < data_->num_chunks(); c++) { + const std::shared_ptr arr = data_->chunk(c); + auto string_arr = static_cast(arr.get()); - if (data->num_chunks() == 1 && data->null_count() == 0) { - return ConvertValuesZeroCopy(data->chunk(0)); + const uint8_t* data_ptr; + int32_t length; + if (data_->null_count() > 0) { + for (int64_t i = 0; i < arr->length(); ++i) { + if (string_arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values = Py_None; + } else { + data_ptr = string_arr->GetValue(i, &length); + + *out_values = make_pystring(data_ptr, length); + if (*out_values == nullptr) { + return Status::UnknownError("String initialization failed"); + } + } + ++out_values; + } + } else { + for (int64_t i = 0; i < arr->length(); ++i) { + data_ptr = string_arr->GetValue(i, &length); + *out_values = make_pystring(data_ptr, length); + if (*out_values == nullptr) { + return Status::UnknownError("String initialization failed"); + } + ++out_values; + } + } } - RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); + return Status::OK(); + } - for (int c = 0; c < data->num_chunks(); c++) { - const std::shared_ptr arr = data->chunk(c); + template + void ConvertNumericNullable(T na_value, T* out_values) { + for (int c = 0; c < data_->num_chunks(); c++) { + const std::shared_ptr arr = data_->chunk(c); auto prim_arr = static_cast(arr.get()); auto in_values = reinterpret_cast(prim_arr->data()->data()); - auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; if (arr->null_count() > 0) { for (int64_t i = 0; i < arr->length(); ++i) { - out_values[i] = arr->IsNull(i) ? arrow_traits::na_value : in_values[i]; + *out_values++ = arr->IsNull(i) ? na_value : in_values[i]; } } else { memcpy(out_values, in_values, sizeof(T) * arr->length()); + out_values += arr->length(); } - - chunk_offset += arr->length(); } - - return Status::OK(); } - template - inline typename std::enable_if< - T2 == arrow::Type::DATE, Status>::type - ConvertValues(const std::shared_ptr& data) { - typedef typename arrow_traits::T T; - size_t chunk_offset = 0; - - RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); - - for (int c = 0; c < data->num_chunks(); c++) { - const std::shared_ptr arr = data->chunk(c); + template + void ConvertDates(T na_value, T* out_values) { + for (int c = 0; c < data_->num_chunks(); c++) { + const std::shared_ptr arr = data_->chunk(c); auto prim_arr = static_cast(arr.get()); auto in_values = reinterpret_cast(prim_arr->data()->data()); - auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; for (int64_t i = 0; i < arr->length(); ++i) { // There are 1000 * 60 * 60 * 24 = 86400000ms in a day - out_values[i] = arr->IsNull(i) ? arrow_traits::na_value : in_values[i] / 86400000; + *out_values++ = arr->IsNull(i) ? na_value : in_values[i] / 86400000; } + } + } + + // ---------------------------------------------------------------------- + // Allocate new array and deserialize. Can do a zero copy conversion for some + // types - chunk_offset += arr->length(); + Status Convert(PyObject** out) { +#define CONVERT_CASE(TYPE) \ + case arrow::Type::TYPE: { \ + RETURN_NOT_OK(ConvertValues()); \ + } break; + + switch (col_->type()->type) { + CONVERT_CASE(BOOL); + CONVERT_CASE(INT8); + CONVERT_CASE(INT16); + CONVERT_CASE(INT32); + CONVERT_CASE(INT64); + CONVERT_CASE(UINT8); + CONVERT_CASE(UINT16); + CONVERT_CASE(UINT32); + CONVERT_CASE(UINT64); + CONVERT_CASE(FLOAT); + CONVERT_CASE(DOUBLE); + CONVERT_CASE(STRING); + CONVERT_CASE(DATE); + CONVERT_CASE(TIMESTAMP); + default: + return Status::NotImplemented("Arrow type reading not implemented"); } +#undef CONVERT_CASE + + *out = reinterpret_cast(out_); return Status::OK(); } - // Integer specialization - template + template inline typename std::enable_if< - arrow_traits::is_pandas_numeric_not_nullable, Status>::type - ConvertValues(const std::shared_ptr& data) { - typedef typename arrow_traits::T T; - size_t chunk_offset = 0; + (TYPE != arrow::Type::DATE) & arrow_traits::is_numeric_nullable, Status>::type + ConvertValues() { + typedef typename arrow_traits::T T; + int npy_type = arrow_traits::npy_type; - if (data->num_chunks() == 1 && data->null_count() == 0) { - return ConvertValuesZeroCopy(data->chunk(0)); + if (data_->num_chunks() == 1 && data_->null_count() == 0) { + return ConvertValuesZeroCopy(npy_type, data_->chunk(0)); } - if (data->null_count() > 0) { - RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); + RETURN_NOT_OK(AllocateOutput(npy_type)); + auto out_values = reinterpret_cast(PyArray_DATA(out_)); + ConvertNumericNullable(arrow_traits::na_value, out_values); - for (int c = 0; c < data->num_chunks(); c++) { - const std::shared_ptr arr = data->chunk(c); - auto prim_arr = static_cast(arr.get()); - auto in_values = reinterpret_cast(prim_arr->data()->data()); - // Upcast to double, set NaN as appropriate - auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + return Status::OK(); + } - for (int i = 0; i < arr->length(); ++i) { - out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i]; - } + template + inline typename std::enable_if::type + ConvertValues() { + typedef typename arrow_traits::T T; - chunk_offset += arr->length(); - } - } else { - RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); - - for (int c = 0; c < data->num_chunks(); c++) { - const std::shared_ptr arr = data->chunk(c); - auto prim_arr = static_cast(arr.get()); - auto in_values = reinterpret_cast(prim_arr->data()->data()); - auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; + RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); + auto out_values = reinterpret_cast(PyArray_DATA(out_)); + ConvertDates(arrow_traits::na_value, out_values); + return Status::OK(); + } - memcpy(out_values, in_values, sizeof(T) * arr->length()); + // Integer specialization + template + inline + typename std::enable_if::is_numeric_not_nullable, Status>::type + ConvertValues() { + typedef typename arrow_traits::T T; + int npy_type = arrow_traits::npy_type; + + if (data_->num_chunks() == 1 && data_->null_count() == 0) { + return ConvertValuesZeroCopy(npy_type, data_->chunk(0)); + } - chunk_offset += arr->length(); - } + if (data_->null_count() > 0) { + RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); + auto out_values = reinterpret_cast(PyArray_DATA(out_)); + ConvertIntegerWithNulls(out_values); + } else { + RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); + auto out_values = reinterpret_cast(PyArray_DATA(out_)); + ConvertIntegerNoNullsSameType(out_values); } return Status::OK(); } // Boolean specialization - template - inline typename std::enable_if< - arrow_traits::is_boolean, Status>::type - ConvertValues(const std::shared_ptr& data) { - size_t chunk_offset = 0; + template + inline typename std::enable_if::is_boolean, Status>::type + ConvertValues() { PyAcquireGIL lock; - - if (data->null_count() > 0) { + if (data_->null_count() > 0) { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); - - for (int c = 0; c < data->num_chunks(); c++) { - const std::shared_ptr arr = data->chunk(c); - auto bool_arr = static_cast(arr.get()); - auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; - - for (int64_t i = 0; i < arr->length(); ++i) { - if (bool_arr->IsNull(i)) { - Py_INCREF(Py_None); - out_values[i] = Py_None; - } else if (bool_arr->Value(i)) { - // True - Py_INCREF(Py_True); - out_values[i] = Py_True; - } else { - // False - Py_INCREF(Py_False); - out_values[i] = Py_False; - } - } - - chunk_offset += bool_arr->length(); - } + auto out_values = reinterpret_cast(PyArray_DATA(out_)); + RETURN_NOT_OK(ConvertBooleanWithNulls(out_values)); } else { RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); - - for (int c = 0; c < data->num_chunks(); c++) { - const std::shared_ptr arr = data->chunk(c); - auto bool_arr = static_cast(arr.get()); - auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; - - for (int64_t i = 0; i < arr->length(); ++i) { - out_values[i] = static_cast(bool_arr->Value(i)); - } - - chunk_offset += bool_arr->length(); - } + auto out_values = reinterpret_cast(PyArray_DATA(out_)); + RETURN_NOT_OK(ConvertBooleanNoNulls(out_values)); } - return Status::OK(); } // UTF8 strings - template - inline typename std::enable_if< - T2 == arrow::Type::STRING, Status>::type - ConvertValues(const std::shared_ptr& data) { - size_t chunk_offset = 0; + template + inline typename std::enable_if::type + ConvertValues() { PyAcquireGIL lock; - RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); + auto out_values = reinterpret_cast(PyArray_DATA(out_)); + return ConvertStrings(out_values); + } - for (int c = 0; c < data->num_chunks(); c++) { - const std::shared_ptr arr = data->chunk(c); - auto string_arr = static_cast(arr.get()); - auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; - - const uint8_t* data_ptr; - int32_t length; - if (data->null_count() > 0) { - for (int64_t i = 0; i < arr->length(); ++i) { - if (string_arr->IsNull(i)) { - Py_INCREF(Py_None); - out_values[i] = Py_None; - } else { - data_ptr = string_arr->GetValue(i, &length); - - out_values[i] = make_pystring(data_ptr, length); - if (out_values[i] == nullptr) { - return Status::UnknownError("String initialization failed"); - } - } - } - } else { - for (int64_t i = 0; i < arr->length(); ++i) { - data_ptr = string_arr->GetValue(i, &length); - out_values[i] = make_pystring(data_ptr, length); - if (out_values[i] == nullptr) { - return Status::UnknownError("String initialization failed"); - } - } - } - - chunk_offset += string_arr->length(); - } - + // ---------------------------------------------------------------------- + // Deserialize into pre-allocated memory + + Status ConvertPreallocated(PyArray_Descr* dtype, void* out) { +#define CONVERT_CASE(TYPE) \ + case arrow::Type::TYPE: { \ + RETURN_NOT_OK(ConvertValues()); \ + } break; + +// switch (col_->type()->type) { +// CONVERT_CASE(BOOL); +// CONVERT_CASE(INT8); +// CONVERT_CASE(INT16); +// CONVERT_CASE(INT32); +// CONVERT_CASE(INT64); +// CONVERT_CASE(UINT8); +// CONVERT_CASE(UINT16); +// CONVERT_CASE(UINT32); +// CONVERT_CASE(UINT64); +// CONVERT_CASE(FLOAT); +// CONVERT_CASE(DOUBLE); +// CONVERT_CASE(STRING); +// CONVERT_CASE(DATE); +// CONVERT_CASE(TIMESTAMP); +// default: +// return Status::NotImplemented("Arrow type reading not implemented"); +// } + +#undef CONVERT_CASE return Status::OK(); } @@ -974,48 +1015,35 @@ class ArrowDeserializer { private: std::shared_ptr col_; + std::shared_ptr data_; PyObject* py_ref_; PyArrayObject* out_; }; -#define FROM_ARROW_CASE(TYPE) \ - case arrow::Type::TYPE: \ - { \ - ArrowDeserializer converter(col, py_ref); \ - return converter.Convert(out); \ - } \ - break; - -Status ConvertArrayToPandas(const std::shared_ptr& arr, PyObject* py_ref, - PyObject** out) { +Status ConvertArrayToPandas( + const std::shared_ptr& arr, PyObject* py_ref, PyObject** out) { static std::string dummy_name = "dummy"; auto field = std::make_shared(dummy_name, arr->type()); auto col = std::make_shared(field, arr); return ConvertColumnToPandas(col, py_ref, out); } -Status ConvertColumnToPandas(const std::shared_ptr& col, PyObject* py_ref, - PyObject** out) { - switch(col->type()->type) { - FROM_ARROW_CASE(BOOL); - FROM_ARROW_CASE(INT8); - FROM_ARROW_CASE(INT16); - FROM_ARROW_CASE(INT32); - FROM_ARROW_CASE(INT64); - FROM_ARROW_CASE(UINT8); - FROM_ARROW_CASE(UINT16); - FROM_ARROW_CASE(UINT32); - FROM_ARROW_CASE(UINT64); - FROM_ARROW_CASE(FLOAT); - FROM_ARROW_CASE(DOUBLE); - FROM_ARROW_CASE(BINARY); - FROM_ARROW_CASE(STRING); - FROM_ARROW_CASE(DATE); - FROM_ARROW_CASE(TIMESTAMP); - default: - return Status::NotImplemented("Arrow type reading not implemented"); - } +Status ConvertColumnToPandas( + const std::shared_ptr& col, PyObject* py_ref, PyObject** out) { + ArrowDeserializer converter(col, py_ref); + return converter.Convert(out); +} + +Status ConvertTableToPandas( + const std::shared_ptr& table, int nthreads, PyObject** out) { + // Construct the exact pandas 0.x "BlockManager" memory layout + // + // * For each column determine the correct output pandas type + // * Allocate 2D blocks (ncols x nrows) for each distinct data type in output + // * Allocate block placement arrays + // * Write Arrow columns out into each slice of memory; populate block + // * placement arrays as we go return Status::OK(); } -} // namespace pyarrow +} // namespace pyarrow diff --git a/python/src/pyarrow/adapters/pandas.h b/python/src/pyarrow/adapters/pandas.h index 532495dd792..60dadd473ad 100644 --- a/python/src/pyarrow/adapters/pandas.h +++ b/python/src/pyarrow/adapters/pandas.h @@ -33,27 +33,42 @@ class Array; class Column; class MemoryPool; class Status; +class Table; -} // namespace arrow +} // namespace arrow namespace pyarrow { PYARROW_EXPORT -arrow::Status ConvertArrayToPandas(const std::shared_ptr& arr, - PyObject* py_ref, PyObject** out); +arrow::Status ConvertArrayToPandas( + const std::shared_ptr& arr, PyObject* py_ref, PyObject** out); PYARROW_EXPORT -arrow::Status ConvertColumnToPandas(const std::shared_ptr& col, - PyObject* py_ref, PyObject** out); +arrow::Status ConvertColumnToPandas( + const std::shared_ptr& col, PyObject* py_ref, PyObject** out); + +struct PandasOptions { + bool strings_to_categorical; +}; + +// Convert a whole table as efficiently as possible to a pandas.DataFrame. +// +// The returned Python object is a list of tuples consisting of the exact 2D +// BlockManager structure of the pandas.DataFrame used as of pandas 0.19.x. +// +// tuple item: (indices: ndarray[int32], block: ndarray[TYPE, ndim=2]) +PYARROW_EXPORT +arrow::Status ConvertTableToPandas( + const std::shared_ptr& table, int nthreads, PyObject** out); PYARROW_EXPORT arrow::Status PandasMaskedToArrow(arrow::MemoryPool* pool, PyObject* ao, PyObject* mo, std::shared_ptr* out); PYARROW_EXPORT -arrow::Status PandasToArrow(arrow::MemoryPool* pool, PyObject* ao, - std::shared_ptr* out); +arrow::Status PandasToArrow( + arrow::MemoryPool* pool, PyObject* ao, std::shared_ptr* out); -} // namespace pyarrow +} // namespace pyarrow -#endif // PYARROW_ADAPTERS_PANDAS_H +#endif // PYARROW_ADAPTERS_PANDAS_H diff --git a/python/src/pyarrow/api.h b/python/src/pyarrow/api.h index 6dbbc45d40c..f65cc097f54 100644 --- a/python/src/pyarrow/api.h +++ b/python/src/pyarrow/api.h @@ -23,4 +23,4 @@ #include "pyarrow/adapters/builtin.h" #include "pyarrow/adapters/pandas.h" -#endif // PYARROW_API_H +#endif // PYARROW_API_H diff --git a/python/src/pyarrow/common.cc b/python/src/pyarrow/common.cc index fb4d3496ac7..8660ac8f0ce 100644 --- a/python/src/pyarrow/common.cc +++ b/python/src/pyarrow/common.cc @@ -73,7 +73,7 @@ arrow::MemoryPool* get_memory_pool() { PyBytesBuffer::PyBytesBuffer(PyObject* obj) : Buffer(reinterpret_cast(PyBytes_AS_STRING(obj)), - PyBytes_GET_SIZE(obj)), + PyBytes_GET_SIZE(obj)), obj_(obj) { Py_INCREF(obj_); } @@ -83,4 +83,4 @@ PyBytesBuffer::~PyBytesBuffer() { Py_DECREF(obj_); } -} // namespace pyarrow +} // namespace pyarrow diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h index 7e3382634a7..639918d309f 100644 --- a/python/src/pyarrow/common.h +++ b/python/src/pyarrow/common.h @@ -24,7 +24,9 @@ #include "arrow/buffer.h" #include "arrow/util/macros.h" -namespace arrow { class MemoryPool; } +namespace arrow { +class MemoryPool; +} namespace pyarrow { @@ -34,27 +36,18 @@ class OwnedRef { public: OwnedRef() : obj_(nullptr) {} - OwnedRef(PyObject* obj) : - obj_(obj) {} + OwnedRef(PyObject* obj) : obj_(obj) {} - ~OwnedRef() { - Py_XDECREF(obj_); - } + ~OwnedRef() { Py_XDECREF(obj_); } void reset(PyObject* obj) { - if (obj_ != nullptr) { - Py_XDECREF(obj_); - } + if (obj_ != nullptr) { Py_XDECREF(obj_); } obj_ = obj; } - void release() { - obj_ = nullptr; - } + void release() { obj_ = nullptr; } - PyObject* obj() const{ - return obj_; - } + PyObject* obj() const { return obj_; } private: PyObject* obj_; @@ -78,13 +71,10 @@ struct PyObjectStringify { class PyGILGuard { public: - PyGILGuard() { - state_ = PyGILState_Ensure(); - } + PyGILGuard() { state_ = PyGILState_Ensure(); } + + ~PyGILGuard() { PyGILState_Release(state_); } - ~PyGILGuard() { - PyGILState_Release(state_); - } private: PyGILState_STATE state_; DISALLOW_COPY_AND_ASSIGN(PyGILGuard); @@ -108,8 +98,7 @@ PYARROW_EXPORT arrow::MemoryPool* get_memory_pool(); class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer { public: - NumPyBuffer(PyArrayObject* arr) - : Buffer(nullptr, 0) { + NumPyBuffer(PyArrayObject* arr) : Buffer(nullptr, 0) { arr_ = arr; Py_INCREF(arr); @@ -118,9 +107,7 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer { capacity_ = size_; } - virtual ~NumPyBuffer() { - Py_XDECREF(arr_); - } + virtual ~NumPyBuffer() { Py_XDECREF(arr_); } private: PyArrayObject* arr_; @@ -135,22 +122,17 @@ class PYARROW_EXPORT PyBytesBuffer : public arrow::Buffer { PyObject* obj_; }; - class PyAcquireGIL { public: - PyAcquireGIL() { - state_ = PyGILState_Ensure(); - } + PyAcquireGIL() { state_ = PyGILState_Ensure(); } - ~PyAcquireGIL() { - PyGILState_Release(state_); - } + ~PyAcquireGIL() { PyGILState_Release(state_); } private: PyGILState_STATE state_; DISALLOW_COPY_AND_ASSIGN(PyAcquireGIL); }; -} // namespace pyarrow +} // namespace pyarrow -#endif // PYARROW_COMMON_H +#endif // PYARROW_COMMON_H diff --git a/python/src/pyarrow/config.cc b/python/src/pyarrow/config.cc index 730d2db99a5..e1002bf4fd1 100644 --- a/python/src/pyarrow/config.cc +++ b/python/src/pyarrow/config.cc @@ -21,8 +21,7 @@ namespace pyarrow { -void pyarrow_init() { -} +void pyarrow_init() {} PyObject* numpy_nan = nullptr; @@ -31,4 +30,4 @@ void pyarrow_set_numpy_nan(PyObject* obj) { numpy_nan = obj; } -} // namespace pyarrow +} // namespace pyarrow diff --git a/python/src/pyarrow/config.h b/python/src/pyarrow/config.h index 82936b1a5f3..386ee4b1e25 100644 --- a/python/src/pyarrow/config.h +++ b/python/src/pyarrow/config.h @@ -24,7 +24,7 @@ #include "pyarrow/visibility.h" #if PY_MAJOR_VERSION >= 3 - #define PyString_Check PyUnicode_Check +#define PyString_Check PyUnicode_Check #endif namespace pyarrow { @@ -38,6 +38,6 @@ void pyarrow_init(); PYARROW_EXPORT void pyarrow_set_numpy_nan(PyObject* obj); -} // namespace pyarrow +} // namespace pyarrow -#endif // PYARROW_CONFIG_H +#endif // PYARROW_CONFIG_H diff --git a/python/src/pyarrow/helpers.cc b/python/src/pyarrow/helpers.cc index b42199c8e04..bc71175762b 100644 --- a/python/src/pyarrow/helpers.cc +++ b/python/src/pyarrow/helpers.cc @@ -23,7 +23,6 @@ using namespace arrow; namespace pyarrow { - #define GET_PRIMITIVE_TYPE(NAME, FACTORY) \ case Type::NAME: \ return FACTORY(); \ @@ -55,4 +54,4 @@ std::shared_ptr GetPrimitiveType(Type::type type) { } } -} // namespace pyarrow +} // namespace pyarrow diff --git a/python/src/pyarrow/helpers.h b/python/src/pyarrow/helpers.h index 8334d974c02..788c3eedddf 100644 --- a/python/src/pyarrow/helpers.h +++ b/python/src/pyarrow/helpers.h @@ -31,6 +31,6 @@ using arrow::Type; PYARROW_EXPORT std::shared_ptr GetPrimitiveType(Type::type type); -} // namespace pyarrow +} // namespace pyarrow -#endif // PYARROW_HELPERS_H +#endif // PYARROW_HELPERS_H diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc index 12f5ba0bf2b..ac1aa635b40 100644 --- a/python/src/pyarrow/io.cc +++ b/python/src/pyarrow/io.cc @@ -33,8 +33,7 @@ namespace pyarrow { // ---------------------------------------------------------------------- // Python file -PythonFile::PythonFile(PyObject* file) - : file_(file) { +PythonFile::PythonFile(PyObject* file) : file_(file) { Py_INCREF(file_); } @@ -81,8 +80,8 @@ Status PythonFile::Read(int64_t nbytes, PyObject** out) { } Status PythonFile::Write(const uint8_t* data, int64_t nbytes) { - PyObject* py_data = PyBytes_FromStringAndSize( - reinterpret_cast(data), nbytes); + PyObject* py_data = + PyBytes_FromStringAndSize(reinterpret_cast(data), nbytes); ARROW_RETURN_NOT_OK(CheckPyError()); PyObject* result = PyObject_CallMethod(file_, "write", "(O)", py_data); @@ -102,7 +101,7 @@ Status PythonFile::Tell(int64_t* position) { // PyLong_AsLongLong can raise OverflowError ARROW_RETURN_NOT_OK(CheckPyError()); - return Status::OK(); + return Status::OK(); } // ---------------------------------------------------------------------- @@ -156,7 +155,8 @@ Status PyReadableFile::Read(int64_t nbytes, std::shared_ptr* out) Status PyReadableFile::GetSize(int64_t* size) { PyGILGuard lock; - int64_t current_position;; + int64_t current_position; + ; ARROW_RETURN_NOT_OK(file_->Tell(¤t_position)); ARROW_RETURN_NOT_OK(file_->Seek(0, 2)); @@ -204,7 +204,7 @@ Status PyOutputStream::Write(const uint8_t* data, int64_t nbytes) { PyBytesReader::PyBytesReader(PyObject* obj) : arrow::io::BufferReader(reinterpret_cast(PyBytes_AS_STRING(obj)), - PyBytes_GET_SIZE(obj)), + PyBytes_GET_SIZE(obj)), obj_(obj) { Py_INCREF(obj_); } diff --git a/python/src/pyarrow/io.h b/python/src/pyarrow/io.h index e14aa8cfb27..fd3e7c08872 100644 --- a/python/src/pyarrow/io.h +++ b/python/src/pyarrow/io.h @@ -24,7 +24,9 @@ #include "pyarrow/config.h" #include "pyarrow/visibility.h" -namespace arrow { class MemoryPool; } +namespace arrow { +class MemoryPool; +} namespace pyarrow { @@ -92,6 +94,6 @@ class PYARROW_EXPORT PyBytesReader : public arrow::io::BufferReader { // TODO(wesm): seekable output files -} // namespace pyarrow +} // namespace pyarrow #endif // PYARROW_IO_H diff --git a/python/src/pyarrow/numpy_interop.h b/python/src/pyarrow/numpy_interop.h index 882d287c7c5..6326527a674 100644 --- a/python/src/pyarrow/numpy_interop.h +++ b/python/src/pyarrow/numpy_interop.h @@ -53,6 +53,6 @@ inline int import_numpy() { return 0; } -} // namespace pyarrow +} // namespace pyarrow -#endif // PYARROW_NUMPY_INTEROP_H +#endif // PYARROW_NUMPY_INTEROP_H diff --git a/python/src/pyarrow/util/datetime.h b/python/src/pyarrow/util/datetime.h index b67accc388f..9ffa6910524 100644 --- a/python/src/pyarrow/util/datetime.h +++ b/python/src/pyarrow/util/datetime.h @@ -22,8 +22,8 @@ #include namespace pyarrow { - -inline int64_t PyDate_to_ms(PyDateTime_Date* pydate) { + +inline int64_t PyDate_to_ms(PyDateTime_Date* pydate) { struct tm date = {0}; date.tm_year = PyDateTime_GET_YEAR(pydate) - 1900; date.tm_mon = PyDateTime_GET_MONTH(pydate) - 1; @@ -35,6 +35,6 @@ inline int64_t PyDate_to_ms(PyDateTime_Date* pydate) { return lrint(difftime(mktime(&date), mktime(&epoch)) * 1000); } -} // namespace pyarrow +} // namespace pyarrow -#endif // PYARROW_UTIL_DATETIME_H +#endif // PYARROW_UTIL_DATETIME_H diff --git a/python/src/pyarrow/util/test_main.cc b/python/src/pyarrow/util/test_main.cc index 00139f36742..6fb7c0536ee 100644 --- a/python/src/pyarrow/util/test_main.cc +++ b/python/src/pyarrow/util/test_main.cc @@ -17,7 +17,7 @@ #include -int main(int argc, char **argv) { +int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); int ret = RUN_ALL_TESTS(); From 4928a0bd40534a1859b3944623b0df15d74e2024 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 21 Dec 2016 14:44:38 -0500 Subject: [PATCH 2/7] Refactor post rebase Change-Id: I99d5bb640ca5327e501aa102b11a6e6958359fbb --- python/src/pyarrow/adapters/pandas.cc | 128 ++++++++++---------------- 1 file changed, 49 insertions(+), 79 deletions(-) diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 5e27a43455a..60e84d3802f 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -579,13 +579,35 @@ struct arrow_traits { static constexpr bool is_numeric_nullable = false; }; -static inline PyObject* make_pystring(const uint8_t* data, int32_t length) { +template <> +struct arrow_traits { + static constexpr int npy_type = NPY_OBJECT; + static constexpr bool supports_nulls = true; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = false; +}; + +template +struct WrapBytes {}; + +template <> +struct WrapBytes { + static inline PyObject* Wrap(const uint8_t* data, int64_t length) { #if PY_MAJOR_VERSION >= 3 - return PyUnicode_FromStringAndSize(reinterpret_cast(data), length); + return PyUnicode_FromStringAndSize(reinterpret_cast(data), length); #else - return PyString_FromStringAndSize(reinterpret_cast(data), length); + return PyString_FromStringAndSize(reinterpret_cast(data), length); #endif -} + } +}; + +template <> +struct WrapBytes { + static inline PyObject* Wrap(const uint8_t* data, int64_t length) { + return PyBytes_FromStringAndSize(reinterpret_cast(data), length); + } +}; inline void set_numpy_metadata(int type, DataType* datatype, PyArrayObject* out) { if (type == NPY_DATETIME) { @@ -747,41 +769,29 @@ class ArrowDeserializer { return Status::OK(); } - Status ConvertStrings(PyObject** out_values) { + template + Status ConvertBinaryLike(PyObject** out_values) { PyAcquireGIL lock; for (int c = 0; c < data_->num_chunks(); c++) { - const std::shared_ptr arr = data_->chunk(c); - auto string_arr = static_cast(arr.get()); + auto arr = static_cast(data_->chunk(c).get()); const uint8_t* data_ptr; int32_t length; - if (data_->null_count() > 0) { - for (int64_t i = 0; i < arr->length(); ++i) { - if (string_arr->IsNull(i)) { - Py_INCREF(Py_None); - *out_values = Py_None; - } else { - data_ptr = string_arr->GetValue(i, &length); - - *out_values = make_pystring(data_ptr, length); - if (*out_values == nullptr) { - return Status::UnknownError("String initialization failed"); - } - } - ++out_values; - } - } else { - for (int64_t i = 0; i < arr->length(); ++i) { - data_ptr = string_arr->GetValue(i, &length); - *out_values = make_pystring(data_ptr, length); + const bool has_nulls = data_->null_count() > 0; + for (int64_t i = 0; i < arr->length(); ++i) { + if (has_nulls && arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values = Py_None; + } else { + data_ptr = arr->GetValue(i, &length); + *out_values = WrapBytes::Wrap(data_ptr, length); if (*out_values == nullptr) { return Status::UnknownError("String initialization failed"); } - ++out_values; } + ++out_values; } } - return Status::OK(); } @@ -839,6 +849,7 @@ class ArrowDeserializer { CONVERT_CASE(UINT64); CONVERT_CASE(FLOAT); CONVERT_CASE(DOUBLE); + CONVERT_CASE(BINARY); CONVERT_CASE(STRING); CONVERT_CASE(DATE); CONVERT_CASE(TIMESTAMP); @@ -910,7 +921,6 @@ class ArrowDeserializer { template inline typename std::enable_if::is_boolean, Status>::type ConvertValues() { - PyAcquireGIL lock; if (data_->null_count() > 0) { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast(PyArray_DATA(out_)); @@ -927,10 +937,18 @@ class ArrowDeserializer { template inline typename std::enable_if::type ConvertValues() { - PyAcquireGIL lock; RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast(PyArray_DATA(out_)); - return ConvertStrings(out_values); + return ConvertBinaryLike(out_values); + } + + template + inline typename std::enable_if< + T2 == arrow::Type::BINARY, Status>::type + ConvertValues() { + RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); + auto out_values = reinterpret_cast(PyArray_DATA(out_)); + return ConvertBinaryLike(out_values); } // ---------------------------------------------------------------------- @@ -965,54 +983,6 @@ class ArrowDeserializer { return Status::OK(); } - template - inline typename std::enable_if< - T2 == arrow::Type::BINARY, Status>::type - ConvertValues(const std::shared_ptr& data) { - size_t chunk_offset = 0; - PyAcquireGIL lock; - - RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); - - for (int c = 0; c < data->num_chunks(); c++) { - const std::shared_ptr arr = data->chunk(c); - auto binary_arr = static_cast(arr.get()); - auto out_values = reinterpret_cast(PyArray_DATA(out_)) + chunk_offset; - - const uint8_t* data_ptr; - int32_t length; - if (data->null_count() > 0) { - for (int64_t i = 0; i < arr->length(); ++i) { - if (binary_arr->IsNull(i)) { - Py_INCREF(Py_None); - out_values[i] = Py_None; - } else { - data_ptr = binary_arr->GetValue(i, &length); - - out_values[i] = PyBytes_FromStringAndSize( - reinterpret_cast(data_ptr), length); - if (out_values[i] == nullptr) { - return Status::UnknownError("String initialization failed"); - } - } - } - } else { - for (int64_t i = 0; i < arr->length(); ++i) { - data_ptr = binary_arr->GetValue(i, &length); - out_values[i] = PyBytes_FromStringAndSize( - reinterpret_cast(data_ptr), length); - if (out_values[i] == nullptr) { - return Status::UnknownError("String initialization failed"); - } - } - } - - chunk_offset += binary_arr->length(); - } - - return Status::OK(); - } - private: std::shared_ptr col_; std::shared_ptr data_; From 110692f5dbb39730295964973f352243dc252aad Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 22 Dec 2016 14:50:55 -0500 Subject: [PATCH 3/7] First draft of scaffolding for creating precise pandas.DataFrame block structure Change-Id: I3175c5ddc311fe4fec25bf427cc9ef2675e4790f --- python/src/pyarrow/adapters/pandas.cc | 794 ++++++++++++++++++++------ 1 file changed, 606 insertions(+), 188 deletions(-) diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 60e84d3802f..0aafda8a301 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -28,11 +28,13 @@ #include #include #include +#include #include "arrow/api.h" #include "arrow/status.h" #include "arrow/type_fwd.h" #include "arrow/util/bit-util.h" +#include "arrow/util/macros.h" #include "pyarrow/common.h" #include "pyarrow/config.h" @@ -41,11 +43,13 @@ namespace pyarrow { using arrow::Array; +using arrow::ChunkedArray; using arrow::Column; using arrow::Field; using arrow::DataType; using arrow::Status; using arrow::Table; +using arrow::Type; namespace BitUtil = arrow::BitUtil; @@ -548,11 +552,13 @@ struct arrow_traits { typedef typename npy_traits::value_type T; }; +static constexpr int64_t kPandasTimestampNull = std::numeric_limits::min(); + template <> struct arrow_traits { static constexpr int npy_type = NPY_DATETIME; static constexpr bool supports_nulls = true; - static constexpr int64_t na_value = std::numeric_limits::min(); + static constexpr int64_t na_value = kPandasTimestampNull; static constexpr bool is_boolean = false; static constexpr bool is_numeric_not_nullable = false; static constexpr bool is_numeric_nullable = true; @@ -563,7 +569,7 @@ template <> struct arrow_traits { static constexpr int npy_type = NPY_DATETIME; static constexpr bool supports_nulls = true; - static constexpr int64_t na_value = std::numeric_limits::min(); + static constexpr int64_t na_value = kPandasTimestampNull; static constexpr bool is_boolean = false; static constexpr bool is_numeric_not_nullable = false; static constexpr bool is_numeric_nullable = true; @@ -637,10 +643,153 @@ inline void set_numpy_metadata(int type, DataType* datatype, PyArrayObject* out) } } +template +inline void ConvertIntegerWithNulls(const ChunkedArray& data, double* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr arr = data.chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + // Upcast to double, set NaN as appropriate + + for (int i = 0; i < arr->length(); ++i) { + *out_values++ = prim_arr->IsNull(i) ? NAN : in_values[i]; + } + } +} + +template +inline void ConvertIntegerNoNullsSameType(const ChunkedArray& data, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr arr = data.chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + memcpy(out_values, in_values, sizeof(T) * arr->length()); + out_values += arr->length(); + } +} + +template +inline void ConvertIntegerNoNullsCast(const ChunkedArray& data, OutType* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr arr = data.chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + for (int32_t i = 0; i < arr->length(); ++i) { + *out_values = in_values[i]; + } + } +} + +static Status ConvertBooleanWithNulls(const ChunkedArray& data, PyObject** out_values) { + PyAcquireGIL lock; + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr arr = data.chunk(c); + auto bool_arr = static_cast(arr.get()); + + for (int64_t i = 0; i < arr->length(); ++i) { + if (bool_arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values++ = Py_None; + } else if (bool_arr->Value(i)) { + // True + Py_INCREF(Py_True); + *out_values++ = Py_True; + } else { + // False + Py_INCREF(Py_False); + *out_values++ = Py_False; + } + } + } + return Status::OK(); +} + +static void ConvertBooleanNoNulls(const ChunkedArray& data, uint8_t* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr arr = data.chunk(c); + auto bool_arr = static_cast(arr.get()); + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = static_cast(bool_arr->Value(i)); + } + } +} + +template +inline Status ConvertBinaryLike(const ChunkedArray& data, PyObject** out_values) { + PyAcquireGIL lock; + for (int c = 0; c < data.num_chunks(); c++) { + auto arr = static_cast(data.chunk(c).get()); + + const uint8_t* data_ptr; + int32_t length; + const bool has_nulls = data.null_count() > 0; + for (int64_t i = 0; i < arr->length(); ++i) { + if (has_nulls && arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values = Py_None; + } else { + data_ptr = arr->GetValue(i, &length); + *out_values = WrapBytes::Wrap(data_ptr, length); + if (*out_values == nullptr) { + return Status::UnknownError("String initialization failed"); + } + } + ++out_values; + } + } + return Status::OK(); +} + +template +inline void ConvertNumericNullable(const ChunkedArray& data, T na_value, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr arr = data.chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + + if (arr->null_count() > 0) { + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = arr->IsNull(i) ? na_value : in_values[i]; + } + } else { + memcpy(out_values, in_values, sizeof(T) * arr->length()); + out_values += arr->length(); + } + } +} + +template +inline void ConvertNumericNullableCast( + const ChunkedArray& data, OutType na_value, OutType* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr arr = data.chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = arr->IsNull(i) ? na_value : static_cast(in_values[i]); + } + } +} + +template +inline void ConvertDates(const ChunkedArray& data, T na_value, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr arr = data.chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + + for (int64_t i = 0; i < arr->length(); ++i) { + // There are 1000 * 60 * 60 * 24 = 86400000ms in a day + *out_values++ = arr->IsNull(i) ? na_value : in_values[i] / 86400000; + } + } +} + class ArrowDeserializer { public: ArrowDeserializer(const std::shared_ptr& col, PyObject* py_ref) - : col_(col), data_(col->data()), py_ref_(py_ref) {} + : col_(col), data_(*col->data().get()), py_ref_(py_ref) {} Status AllocateOutput(int type) { PyAcquireGIL lock; @@ -696,137 +845,6 @@ class ArrowDeserializer { return Status::OK(); } - template - void ConvertIntegerWithNulls(double* out_values) { - for (int c = 0; c < data_->num_chunks(); c++) { - const std::shared_ptr arr = data_->chunk(c); - auto prim_arr = static_cast(arr.get()); - auto in_values = reinterpret_cast(prim_arr->data()->data()); - // Upcast to double, set NaN as appropriate - - for (int i = 0; i < arr->length(); ++i) { - *out_values++ = prim_arr->IsNull(i) ? NAN : in_values[i]; - } - } - } - - template - void ConvertIntegerNoNullsSameType(T* out_values) { - for (int c = 0; c < data_->num_chunks(); c++) { - const std::shared_ptr arr = data_->chunk(c); - auto prim_arr = static_cast(arr.get()); - auto in_values = reinterpret_cast(prim_arr->data()->data()); - memcpy(out_values, in_values, sizeof(T) * arr->length()); - out_values += arr->length(); - } - } - - template - void ConvertIntegerNoNullsCast(OutType* out_values) { - for (int c = 0; c < data_->num_chunks(); c++) { - const std::shared_ptr arr = data_->chunk(c); - auto prim_arr = static_cast(arr.get()); - auto in_values = reinterpret_cast(prim_arr->data()->data()); - for (int32_t i = 0; i < arr->length(); ++i) { - *out_values = in_values[i]; - } - } - } - - Status ConvertBooleanWithNulls(PyObject** out_values) { - PyAcquireGIL lock; - for (int c = 0; c < data_->num_chunks(); c++) { - const std::shared_ptr arr = data_->chunk(c); - auto bool_arr = static_cast(arr.get()); - - for (int64_t i = 0; i < arr->length(); ++i) { - if (bool_arr->IsNull(i)) { - Py_INCREF(Py_None); - *out_values++ = Py_None; - } else if (bool_arr->Value(i)) { - // True - Py_INCREF(Py_True); - *out_values++ = Py_True; - } else { - // False - Py_INCREF(Py_False); - *out_values++ = Py_False; - } - } - } - return Status::OK(); - } - - Status ConvertBooleanNoNulls(uint8_t* out_values) { - for (int c = 0; c < data_->num_chunks(); c++) { - const std::shared_ptr arr = data_->chunk(c); - auto bool_arr = static_cast(arr.get()); - for (int64_t i = 0; i < arr->length(); ++i) { - *out_values++ = static_cast(bool_arr->Value(i)); - } - } - - return Status::OK(); - } - - template - Status ConvertBinaryLike(PyObject** out_values) { - PyAcquireGIL lock; - for (int c = 0; c < data_->num_chunks(); c++) { - auto arr = static_cast(data_->chunk(c).get()); - - const uint8_t* data_ptr; - int32_t length; - const bool has_nulls = data_->null_count() > 0; - for (int64_t i = 0; i < arr->length(); ++i) { - if (has_nulls && arr->IsNull(i)) { - Py_INCREF(Py_None); - *out_values = Py_None; - } else { - data_ptr = arr->GetValue(i, &length); - *out_values = WrapBytes::Wrap(data_ptr, length); - if (*out_values == nullptr) { - return Status::UnknownError("String initialization failed"); - } - } - ++out_values; - } - } - return Status::OK(); - } - - template - void ConvertNumericNullable(T na_value, T* out_values) { - for (int c = 0; c < data_->num_chunks(); c++) { - const std::shared_ptr arr = data_->chunk(c); - auto prim_arr = static_cast(arr.get()); - auto in_values = reinterpret_cast(prim_arr->data()->data()); - - if (arr->null_count() > 0) { - for (int64_t i = 0; i < arr->length(); ++i) { - *out_values++ = arr->IsNull(i) ? na_value : in_values[i]; - } - } else { - memcpy(out_values, in_values, sizeof(T) * arr->length()); - out_values += arr->length(); - } - } - } - - template - void ConvertDates(T na_value, T* out_values) { - for (int c = 0; c < data_->num_chunks(); c++) { - const std::shared_ptr arr = data_->chunk(c); - auto prim_arr = static_cast(arr.get()); - auto in_values = reinterpret_cast(prim_arr->data()->data()); - - for (int64_t i = 0; i < arr->length(); ++i) { - // There are 1000 * 60 * 60 * 24 = 86400000ms in a day - *out_values++ = arr->IsNull(i) ? na_value : in_values[i] / 86400000; - } - } - } - // ---------------------------------------------------------------------- // Allocate new array and deserialize. Can do a zero copy conversion for some // types @@ -870,13 +888,13 @@ class ArrowDeserializer { typedef typename arrow_traits::T T; int npy_type = arrow_traits::npy_type; - if (data_->num_chunks() == 1 && data_->null_count() == 0) { - return ConvertValuesZeroCopy(npy_type, data_->chunk(0)); + if (data_.num_chunks() == 1 && data_.null_count() == 0) { + return ConvertValuesZeroCopy(npy_type, data_.chunk(0)); } RETURN_NOT_OK(AllocateOutput(npy_type)); auto out_values = reinterpret_cast(PyArray_DATA(out_)); - ConvertNumericNullable(arrow_traits::na_value, out_values); + ConvertNumericNullable(data_, arrow_traits::na_value, out_values); return Status::OK(); } @@ -888,7 +906,7 @@ class ArrowDeserializer { RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); auto out_values = reinterpret_cast(PyArray_DATA(out_)); - ConvertDates(arrow_traits::na_value, out_values); + ConvertDates(data_, arrow_traits::na_value, out_values); return Status::OK(); } @@ -900,18 +918,18 @@ class ArrowDeserializer { typedef typename arrow_traits::T T; int npy_type = arrow_traits::npy_type; - if (data_->num_chunks() == 1 && data_->null_count() == 0) { - return ConvertValuesZeroCopy(npy_type, data_->chunk(0)); + if (data_.num_chunks() == 1 && data_.null_count() == 0) { + return ConvertValuesZeroCopy(npy_type, data_.chunk(0)); } - if (data_->null_count() > 0) { + if (data_.null_count() > 0) { RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); auto out_values = reinterpret_cast(PyArray_DATA(out_)); - ConvertIntegerWithNulls(out_values); + ConvertIntegerWithNulls(data_, out_values); } else { RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); auto out_values = reinterpret_cast(PyArray_DATA(out_)); - ConvertIntegerNoNullsSameType(out_values); + ConvertIntegerNoNullsSameType(data_, out_values); } return Status::OK(); @@ -921,14 +939,14 @@ class ArrowDeserializer { template inline typename std::enable_if::is_boolean, Status>::type ConvertValues() { - if (data_->null_count() > 0) { + if (data_.null_count() > 0) { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast(PyArray_DATA(out_)); - RETURN_NOT_OK(ConvertBooleanWithNulls(out_values)); + RETURN_NOT_OK(ConvertBooleanWithNulls(data_, out_values)); } else { RETURN_NOT_OK(AllocateOutput(arrow_traits::npy_type)); auto out_values = reinterpret_cast(PyArray_DATA(out_)); - RETURN_NOT_OK(ConvertBooleanNoNulls(out_values)); + ConvertBooleanNoNulls(data_, out_values); } return Status::OK(); } @@ -939,7 +957,7 @@ class ArrowDeserializer { ConvertValues() { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast(PyArray_DATA(out_)); - return ConvertBinaryLike(out_values); + return ConvertBinaryLike(data_, out_values); } template @@ -948,44 +966,12 @@ class ArrowDeserializer { ConvertValues() { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast(PyArray_DATA(out_)); - return ConvertBinaryLike(out_values); - } - - // ---------------------------------------------------------------------- - // Deserialize into pre-allocated memory - - Status ConvertPreallocated(PyArray_Descr* dtype, void* out) { -#define CONVERT_CASE(TYPE) \ - case arrow::Type::TYPE: { \ - RETURN_NOT_OK(ConvertValues()); \ - } break; - -// switch (col_->type()->type) { -// CONVERT_CASE(BOOL); -// CONVERT_CASE(INT8); -// CONVERT_CASE(INT16); -// CONVERT_CASE(INT32); -// CONVERT_CASE(INT64); -// CONVERT_CASE(UINT8); -// CONVERT_CASE(UINT16); -// CONVERT_CASE(UINT32); -// CONVERT_CASE(UINT64); -// CONVERT_CASE(FLOAT); -// CONVERT_CASE(DOUBLE); -// CONVERT_CASE(STRING); -// CONVERT_CASE(DATE); -// CONVERT_CASE(TIMESTAMP); -// default: -// return Status::NotImplemented("Arrow type reading not implemented"); -// } - -#undef CONVERT_CASE - return Status::OK(); + return ConvertBinaryLike(data_, out_values); } private: std::shared_ptr col_; - std::shared_ptr data_; + const arrow::ChunkedArray& data_; PyObject* py_ref_; PyArrayObject* out_; }; @@ -1004,15 +990,447 @@ Status ConvertColumnToPandas( return converter.Convert(out); } +// ---------------------------------------------------------------------- +// pandas 0.x DataFrame conversion internals + +class PandasBlock { + public: + enum type { + OBJECT, + INT, + UINT64, + FLOAT, + DOUBLE, + BOOL, + DATETIME, + CATEGORICAL + }; + + PandasBlock(int64_t num_rows, int num_columns) + : num_rows_(num_rows), num_columns_(num_columns) {} + + virtual Status Allocate() = 0; + virtual Status WriteNext(const std::shared_ptr& col, int placement) = 0; + + protected: + + Status AllocateNDArray(int npy_type) { + PyAcquireGIL lock; + + npy_intp block_dims[2] = {num_columns_, num_rows_}; + PyObject* block_arr = PyArray_SimpleNew(2, block_dims, npy_type); + if (block_arr == NULL) { + // TODO(wesm): propagating Python exception + return Status::OK(); + } + + npy_intp placement_dims[1] = {num_columns_}; + PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT32); + if (placement_arr == NULL) { + // TODO(wesm): propagating Python exception + return Status::OK(); + } + + block_arr_.reset(block_arr); + placement_arr_.reset(placement_arr); + current_placement_index_ = 0; + + block_data_ = reinterpret_cast( + PyArray_DATA(reinterpret_cast(block_arr))); + + placement_data_ = reinterpret_cast( + PyArray_DATA(reinterpret_cast(placement_arr))); + + return Status::OK(); + } + + int64_t num_rows_; + int num_columns_; + int current_placement_index_; + + OwnedRef block_arr_; + uint8_t* block_data_; + + // ndarray + OwnedRef placement_arr_; + int32_t* placement_data_; + + DISALLOW_COPY_AND_ASSIGN(PandasBlock); +}; + + +class ObjectBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { + return AllocateNDArray(NPY_OBJECT); + } + + Status WriteNext(const std::shared_ptr& col, int placement) override { + Type::type type = col->type()->type; + + PyObject** out_buffer = reinterpret_cast(block_data_) + + current_placement_index_ * num_rows_; + + const ChunkedArray& data = *col->data().get(); + + if (type == Type::BOOL) { + RETURN_NOT_OK(ConvertBooleanWithNulls(data, out_buffer)); + } else if (type == Type::BINARY) { + RETURN_NOT_OK(ConvertBinaryLike(data, out_buffer)); + } else if (type == Type::STRING) { + RETURN_NOT_OK(ConvertBinaryLike(data, out_buffer)); + } else { + std::stringstream ss; + ss << "Unsupported type for object array output: " + << col->type()->ToString(); + return Status::NotImplemented(ss.str()); + } + + placement_data_[current_placement_index_++] = placement; + return Status::OK(); + } +}; + +class Int64Block : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { + return AllocateNDArray(NPY_INT64); + } + + Status WriteNext(const std::shared_ptr& col, int placement) override { + Type::type type = col->type()->type; + + int64_t* out_buffer = reinterpret_cast(block_data_) + + current_placement_index_ * num_rows_; + + const ChunkedArray& data = *col->data().get(); + +#define TYPE_CASE(IN_TYPE) \ + ConvertIntegerNoNullsCast(data, out_buffer); \ + break; + + if (col->null_count() > 0) { + return Status::Invalid("Nulls not supported in integer blocks"); + } else { + switch (type) { + case Type::UINT8: TYPE_CASE(uint8_t); + case Type::INT8: TYPE_CASE(int8_t); + case Type::UINT16: TYPE_CASE(uint16_t); + case Type::INT16: TYPE_CASE(int16_t); + case Type::UINT32: TYPE_CASE(uint32_t); + case Type::INT32: TYPE_CASE(int32_t); + case Type::INT64: + ConvertIntegerNoNullsSameType(data, out_buffer); + break; + default: + return Status::NotImplemented(col->type()->ToString()); + } + } + +#undef TYPE_CASE + + placement_data_[current_placement_index_++] = placement; + return Status::OK(); + } +}; + +class UInt64Block : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { + return AllocateNDArray(NPY_UINT64); + } + + Status WriteNext(const std::shared_ptr& col, int placement) override { + Type::type type = col->type()->type; + + uint64_t* out_buffer = reinterpret_cast(block_data_) + + current_placement_index_ * num_rows_; + + const ChunkedArray& data = *col->data().get(); + + if (type != Type::UINT64) { + return Status::NotImplemented(col->type()->ToString()); + } + + ConvertIntegerNoNullsSameType(data, out_buffer); + placement_data_[current_placement_index_++] = placement; + return Status::OK(); + } +}; + +class Float32Block : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { + return AllocateNDArray(NPY_FLOAT32); + } + + Status WriteNext(const std::shared_ptr& col, int placement) override { + Type::type type = col->type()->type; + + if (type != Type::FLOAT) { + return Status::NotImplemented(col->type()->ToString()); + } + + float* out_buffer = reinterpret_cast(block_data_) + + current_placement_index_ * num_rows_; + + ConvertNumericNullable(*col->data().get(), NAN, out_buffer); + placement_data_[current_placement_index_++] = placement; + return Status::OK(); + } +}; + +class Float64Block : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { + return AllocateNDArray(NPY_FLOAT64); + } + + Status WriteNext(const std::shared_ptr& col, int placement) override { + Type::type type = col->type()->type; + + double* out_buffer = reinterpret_cast(block_data_) + + current_placement_index_ * num_rows_; + + const ChunkedArray& data = *col->data().get(); + +#define INTEGER_CASE(IN_TYPE) \ + ConvertIntegerWithNulls(data, out_buffer); \ + break; + + switch (type) { + case Type::UINT8: INTEGER_CASE(uint8_t); + case Type::INT8: INTEGER_CASE(int8_t); + case Type::UINT16: INTEGER_CASE(uint16_t); + case Type::INT16: INTEGER_CASE(int16_t); + case Type::UINT32: INTEGER_CASE(uint32_t); + case Type::INT32: INTEGER_CASE(int32_t); + case Type::UINT64: INTEGER_CASE(int32_t); + case Type::INT64: INTEGER_CASE(int32_t); + case Type::FLOAT: + ConvertNumericNullableCast(data, NAN, out_buffer); + break; + case Type::DOUBLE: + ConvertNumericNullable(data, NAN, out_buffer); + break; + default: + return Status::NotImplemented(col->type()->ToString()); + } + +#undef INTEGER_CASE + + placement_data_[current_placement_index_++] = placement; + return Status::OK(); + } +}; + +class BoolBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { + return AllocateNDArray(NPY_BOOL); + } + + Status WriteNext(const std::shared_ptr& col, int placement) override { + Type::type type = col->type()->type; + + if (type != Type::BOOL) { + return Status::NotImplemented(col->type()->ToString()); + } + + uint8_t* out_buffer = reinterpret_cast(block_data_) + + current_placement_index_ * num_rows_; + + ConvertBooleanNoNulls(*col->data().get(), out_buffer); + placement_data_[current_placement_index_++] = placement; + return Status::OK(); + } +}; + +class DatetimeBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { + RETURN_NOT_OK(AllocateNDArray(NPY_DATETIME)); + + PyAcquireGIL lock; + auto date_dtype = reinterpret_cast( + PyArray_DESCR(reinterpret_cast(block_arr_.obj()))->c_metadata); + date_dtype->meta.base = NPY_FR_ns; + return Status::OK(); + } + + Status WriteNext(const std::shared_ptr& col, int placement) override { + Type::type type = col->type()->type; + + if (type != Type::TIMESTAMP) { + return Status::NotImplemented(col->type()->ToString()); + } + + int64_t* out_buffer = reinterpret_cast(block_data_) + + current_placement_index_ * num_rows_; + + ConvertNumericNullable(*col->data().get(), kPandasTimestampNull, out_buffer); + placement_data_[current_placement_index_++] = placement; + return Status::OK(); + } +}; + +// class CategoricalBlock : public PandasBlock {}; + + +Status MakeBlock(PandasBlock::type type, int64_t num_rows, int num_columns, + std::shared_ptr* block) { + switch (type) { + case PandasBlock::OBJECT: + *block = std::make_shared(num_rows, num_columns); + break; + case PandasBlock::INT: + *block = std::make_shared(num_rows, num_columns); + break; + case PandasBlock::UINT64: + *block = std::make_shared(num_rows, num_columns); + break; + case PandasBlock::FLOAT: + *block = std::make_shared(num_rows, num_columns); + break; + case PandasBlock::DOUBLE: + *block = std::make_shared(num_rows, num_columns); + break; + case PandasBlock::BOOL: + *block = std::make_shared(num_rows, num_columns); + break; + case PandasBlock::DATETIME: + *block = std::make_shared(num_rows, num_columns); + break; + case PandasBlock::CATEGORICAL: + return Status::NotImplemented("categorical"); + } + + return (*block)->Allocate(); +} + +// Construct the exact pandas 0.x "BlockManager" memory layout +// +// * For each column determine the correct output pandas type +// * Allocate 2D blocks (ncols x nrows) for each distinct data type in output +// * Allocate block placement arrays +// * Write Arrow columns out into each slice of memory; populate block +// * placement arrays as we go +class DataFrameBlockCreator { + public: + DataFrameBlockCreator(const std::shared_ptr
& table) + : table_(table) {} + + Status Convert() { + column_types_.resize(table_->num_columns()); + type_counts_.clear(); + blocks_.clear(); + + RETURN_NOT_OK(CountColumnTypes()); + RETURN_NOT_OK(CreateBlocks()); + RETURN_NOT_OK(WriteTableToBlocks()); + } + + Status CountColumnTypes() { + for (int i = 0; i < table_->num_columns(); ++i) { + std::shared_ptr col = table_->column(i); + PandasBlock::type output_type; + + switch (col->type()->type) { + case Type::BOOL: + output_type = col->null_count() > 0? PandasBlock::OBJECT : PandasBlock::BOOL; + break; + case Type::UINT8: + case Type::INT8: + case Type::UINT16: + case Type::INT16: + case Type::UINT32: + case Type::INT32: + case Type::UINT64: + case Type::INT64: + output_type = col->null_count() > 0? PandasBlock::DOUBLE : PandasBlock::INT; + break; + case Type::FLOAT: + output_type = PandasBlock::FLOAT; + break; + case Type::DOUBLE: + output_type = PandasBlock::DOUBLE; + break; + case Type::STRING: + case Type::BINARY: + output_type = PandasBlock::OBJECT; + break; + case Type::TIMESTAMP: + output_type = PandasBlock::DATETIME; + break; + default: + return Status::NotImplemented(col->type()->ToString()); + } + + auto it = type_counts_.find(output_type); + if (it != type_counts_.end()) { + // Increment count + it->second += 1; + } else { + // Add key to map + type_counts_[output_type] = 1; + } + + column_types_[i] = output_type; + } + return Status::OK(); + } + + Status CreateBlocks() { + for (const auto& it : type_counts_) { + PandasBlock::type type = static_cast(it.first); + std::shared_ptr block; + RETURN_NOT_OK(MakeBlock(type, table_->num_rows(), it.second, &block)); + blocks_[type] = block; + } + return Status::OK(); + } + + Status WriteTableToBlocks() { + for (int i = 0; i < table_->num_columns(); ++i) { + std::shared_ptr col = table_->column(i); + PandasBlock::type output_type = column_types_[i]; + + auto it = blocks_.find(output_type); + if (it == blocks_.end()) { + return Status::KeyError("No block allocated"); + } + RETURN_NOT_OK(it->second->WriteNext(col, i)); + } + return Status::OK(); + } + + private: + std::shared_ptr
table_; + std::vector column_types_; + + // block type -> type count + std::unordered_map type_counts_; + + // block type -> block + std::unordered_map> blocks_; +}; + Status ConvertTableToPandas( const std::shared_ptr
& table, int nthreads, PyObject** out) { - // Construct the exact pandas 0.x "BlockManager" memory layout - // - // * For each column determine the correct output pandas type - // * Allocate 2D blocks (ncols x nrows) for each distinct data type in output - // * Allocate block placement arrays - // * Write Arrow columns out into each slice of memory; populate block - // * placement arrays as we go return Status::OK(); } From af960ee9e23148816fe90e790b976338b0b6b0a8 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 22 Dec 2016 15:13:39 -0500 Subject: [PATCH 4/7] Draft blocks -> DataFrame scaffold Change-Id: If71efc513509c7e5539694728506eeef8b76bf45 --- python/pyarrow/includes/pyarrow.pxd | 5 ++- python/pyarrow/table.pyx | 55 +++++++++++++++++++++------ python/src/pyarrow/adapters/pandas.cc | 49 ++++++++++++++++++++++-- 3 files changed, 92 insertions(+), 17 deletions(-) diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd index a5444c236bc..dc6ccd20259 100644 --- a/python/pyarrow/includes/pyarrow.pxd +++ b/python/pyarrow/includes/pyarrow.pxd @@ -18,7 +18,7 @@ # distutils: language = c++ from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport (CArray, CBuffer, CColumn, +from pyarrow.includes.libarrow cimport (CArray, CBuffer, CColumn, CTable, CDataType, CStatus, Type, MemoryPool) cimport pyarrow.includes.libarrow_io as arrow_io @@ -39,6 +39,9 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil: CStatus ConvertColumnToPandas(const shared_ptr[CColumn]& arr, PyObject* py_ref, PyObject** out) + CStatus ConvertTableToPandas(const shared_ptr[CTable]& table, + int nthreads, PyObject** out) + MemoryPool* get_memory_pool() diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 2f7d4309e45..74e035a6d62 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -430,6 +430,32 @@ cdef class RecordBatch: return result +cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads): + cdef: + PyObject* result_obj + CColumn* col + int i + + from pandas.core.internals import BlockManager, make_block + from pandas import RangeIndex + + check_status(pyarrow.ConvertTableToPandas(table, nthreads, &result_obj)) + + result = PyObject_to_object(result_obj) + + blocks = [] + for block_arr, placement_arr in result: + blocks.append(make_block(block_arr, placement=placement_arr)) + + names = [] + for i in range(table.get().num_columns()): + col = table.get().column(i).get() + names.append(frombytes(col.name())) + + axes = [names, RangeIndex(table.get().num_rows())] + return BlockManager(blocks, axes) + + cdef class Table: """ A collection of top-level named, equal length Arrow arrays. @@ -584,7 +610,7 @@ cdef class Table: table.init(c_table) return table - def to_pandas(self): + def to_pandas(self, nthreads=1, block_based=True): """ Convert the arrow::Table to a pandas DataFrame @@ -597,19 +623,24 @@ cdef class Table: shared_ptr[CColumn] col Column column + from pandas.core.internals import make_block, BlockManager import pandas as pd - names = [] - data = [] - for i in range(self.table.num_columns()): - col = self.table.column(i) - column = self.column(i) - check_status(pyarrow.ConvertColumnToPandas( - col, column, &arr)) - names.append(frombytes(col.get().name())) - data.append(PyObject_to_object(arr)) - - return pd.DataFrame(dict(zip(names, data)), columns=names) + if block_based: + mgr = table_to_blockmanager(self.sp_table, nthreads) + return pd.DataFrame(mgr) + else: + names = [] + data = [] + for i in range(self.table.num_columns()): + col = self.table.column(i) + column = self.column(i) + check_status(pyarrow.ConvertColumnToPandas( + col, column, &arr)) + names.append(frombytes(col.get().name())) + data.append(PyObject_to_object(arr)) + + return pd.DataFrame(dict(zip(names, data)), columns=names) @property def name(self): diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 0aafda8a301..24caf101100 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -1012,6 +1012,14 @@ class PandasBlock { virtual Status Allocate() = 0; virtual Status WriteNext(const std::shared_ptr& col, int placement) = 0; + PyObject* block_arr() { + return block_arr_.obj(); + } + + PyObject* placement_arr() { + return block_arr_.obj(); + } + protected: Status AllocateNDArray(int npy_type) { @@ -1334,14 +1342,16 @@ class DataFrameBlockCreator { DataFrameBlockCreator(const std::shared_ptr
& table) : table_(table) {} - Status Convert() { + Status Convert(int nthreads, PyObject** output) { column_types_.resize(table_->num_columns()); type_counts_.clear(); blocks_.clear(); RETURN_NOT_OK(CountColumnTypes()); RETURN_NOT_OK(CreateBlocks()); - RETURN_NOT_OK(WriteTableToBlocks()); + RETURN_NOT_OK(WriteTableToBlocks(nthreads)); + + return GetResultList(output); } Status CountColumnTypes() { @@ -1404,7 +1414,11 @@ class DataFrameBlockCreator { return Status::OK(); } - Status WriteTableToBlocks() { + Status WriteTableToBlocks(int nthreads) { + if (nthreads > 1) { + return Status::NotImplemented("multithreading not yet implemented"); + } + for (int i = 0; i < table_->num_columns(); ++i) { std::shared_ptr col = table_->column(i); PandasBlock::type output_type = column_types_[i]; @@ -1418,6 +1432,32 @@ class DataFrameBlockCreator { return Status::OK(); } + Status GetResultList(PyObject** out) { + auto num_blocks = static_cast(blocks_.size()); + PyObject* result = PyList_New(num_blocks); + RETURN_IF_PYERROR(); + + for (const auto& it : blocks_) { + const std::shared_ptr block = it.second; + + PyObject* item = PyTuple_New(2); + RETURN_IF_PYERROR(); + + PyObject* block_arr = block->block_arr(); + PyObject* placement_arr = block->placement_arr(); + Py_INCREF(block_arr); + Py_INCREF(placement_arr); + PyTuple_SET_ITEM(item, 0, block_arr); + PyTuple_SET_ITEM(item, 1, placement_arr); + + if (PyList_Append(result, item) < 0) { + RETURN_IF_PYERROR(); + } + } + *out = result; + return Status::OK(); + } + private: std::shared_ptr
table_; std::vector column_types_; @@ -1431,7 +1471,8 @@ class DataFrameBlockCreator { Status ConvertTableToPandas( const std::shared_ptr
& table, int nthreads, PyObject** out) { - return Status::OK(); + DataFrameBlockCreator helper(table); + return helper.Convert(nthreads, out); } } // namespace pyarrow From ec239b87240e3bf3ae8c8be10ea05772a7d0c621 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 22 Dec 2016 15:29:11 -0500 Subject: [PATCH 5/7] Fix DataFrame constructions, code formatting Change-Id: I5935b9d28b0d3c13fdb46ddc71fb7ab1d9031878 --- python/src/pyarrow/adapters/builtin.cc | 24 ++-- python/src/pyarrow/adapters/pandas.cc | 192 +++++++++++-------------- python/src/pyarrow/helpers.cc | 34 ++--- 3 files changed, 111 insertions(+), 139 deletions(-) diff --git a/python/src/pyarrow/adapters/builtin.cc b/python/src/pyarrow/adapters/builtin.cc index 1567be95e2a..fb7475f0c94 100644 --- a/python/src/pyarrow/adapters/builtin.cc +++ b/python/src/pyarrow/adapters/builtin.cc @@ -44,16 +44,16 @@ static inline bool IsPyInteger(PyObject* obj) { class ScalarVisitor { public: - ScalarVisitor() : - total_count_(0), - none_count_(0), - bool_count_(0), - int_count_(0), - date_count_(0), - timestamp_count_(0), - float_count_(0), - binary_count_(0), - unicode_count_(0) {} + ScalarVisitor() + : total_count_(0), + none_count_(0), + bool_count_(0), + int_count_(0), + date_count_(0), + timestamp_count_(0), + float_count_(0), + binary_count_(0), + unicode_count_(0) {} void Visit(PyObject* obj) { ++total_count_; @@ -215,9 +215,7 @@ static Status InferArrowType( } // For 0-length sequences, refuse to guess - if (*size == 0) { - *out_type = arrow::null(); - } + if (*size == 0) { *out_type = arrow::null(); } SeqVisitor seq_visitor; RETURN_NOT_OK(seq_visitor.Visit(obj)); diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 24caf101100..20fed607124 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -221,8 +221,8 @@ class ArrowSerializer { if (have_bytes) { const auto& arr = static_cast(*out->get()); - *out = std::make_shared(arr.length(), arr.offsets(), - arr.data(), arr.null_count(), arr.null_bitmap()); + *out = std::make_shared( + arr.length(), arr.offsets(), arr.data(), arr.null_count(), arr.null_bitmap()); } return Status::OK(); } @@ -821,8 +821,8 @@ class ArrowDeserializer { // Zero-Copy. We can pass the data pointer directly to NumPy. npy_intp dims[1] = {col_->length()}; - out_ = - reinterpret_cast(PyArray_SimpleNewFromData(1, dims, npy_type, data)); + out_ = reinterpret_cast( + PyArray_SimpleNewFromData(1, dims, npy_type, data)); if (out_ == NULL) { // Error occurred, trust that SimpleNew set the error state @@ -961,8 +961,7 @@ class ArrowDeserializer { } template - inline typename std::enable_if< - T2 == arrow::Type::BINARY, Status>::type + inline typename std::enable_if::type ConvertValues() { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); auto out_values = reinterpret_cast(PyArray_DATA(out_)); @@ -995,33 +994,19 @@ Status ConvertColumnToPandas( class PandasBlock { public: - enum type { - OBJECT, - INT, - UINT64, - FLOAT, - DOUBLE, - BOOL, - DATETIME, - CATEGORICAL - }; + enum type { OBJECT, INT, UINT64, FLOAT, DOUBLE, BOOL, DATETIME, CATEGORICAL }; PandasBlock(int64_t num_rows, int num_columns) : num_rows_(num_rows), num_columns_(num_columns) {} virtual Status Allocate() = 0; - virtual Status WriteNext(const std::shared_ptr& col, int placement) = 0; + virtual Status WriteNext(const std::shared_ptr& col, int64_t placement) = 0; - PyObject* block_arr() { - return block_arr_.obj(); - } + PyObject* block_arr() { return block_arr_.obj(); } - PyObject* placement_arr() { - return block_arr_.obj(); - } + PyObject* placement_arr() { return placement_arr_.obj(); } protected: - Status AllocateNDArray(int npy_type) { PyAcquireGIL lock; @@ -1033,7 +1018,7 @@ class PandasBlock { } npy_intp placement_dims[1] = {num_columns_}; - PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT32); + PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT64); if (placement_arr == NULL) { // TODO(wesm): propagating Python exception return Status::OK(); @@ -1046,7 +1031,7 @@ class PandasBlock { block_data_ = reinterpret_cast( PyArray_DATA(reinterpret_cast(block_arr))); - placement_data_ = reinterpret_cast( + placement_data_ = reinterpret_cast( PyArray_DATA(reinterpret_cast(placement_arr))); return Status::OK(); @@ -1061,25 +1046,22 @@ class PandasBlock { // ndarray OwnedRef placement_arr_; - int32_t* placement_data_; + int64_t* placement_data_; DISALLOW_COPY_AND_ASSIGN(PandasBlock); }; - class ObjectBlock : public PandasBlock { public: using PandasBlock::PandasBlock; - Status Allocate() override { - return AllocateNDArray(NPY_OBJECT); - } + Status Allocate() override { return AllocateNDArray(NPY_OBJECT); } - Status WriteNext(const std::shared_ptr& col, int placement) override { + Status WriteNext(const std::shared_ptr& col, int64_t placement) override { Type::type type = col->type()->type; - PyObject** out_buffer = reinterpret_cast(block_data_) + - current_placement_index_ * num_rows_; + PyObject** out_buffer = + reinterpret_cast(block_data_) + current_placement_index_ * num_rows_; const ChunkedArray& data = *col->data().get(); @@ -1091,8 +1073,7 @@ class ObjectBlock : public PandasBlock { RETURN_NOT_OK(ConvertBinaryLike(data, out_buffer)); } else { std::stringstream ss; - ss << "Unsupported type for object array output: " - << col->type()->ToString(); + ss << "Unsupported type for object array output: " << col->type()->ToString(); return Status::NotImplemented(ss.str()); } @@ -1105,32 +1086,36 @@ class Int64Block : public PandasBlock { public: using PandasBlock::PandasBlock; - Status Allocate() override { - return AllocateNDArray(NPY_INT64); - } + Status Allocate() override { return AllocateNDArray(NPY_INT64); } - Status WriteNext(const std::shared_ptr& col, int placement) override { + Status WriteNext(const std::shared_ptr& col, int64_t placement) override { Type::type type = col->type()->type; - int64_t* out_buffer = reinterpret_cast(block_data_) + - current_placement_index_ * num_rows_; + int64_t* out_buffer = + reinterpret_cast(block_data_) + current_placement_index_ * num_rows_; const ChunkedArray& data = *col->data().get(); -#define TYPE_CASE(IN_TYPE) \ - ConvertIntegerNoNullsCast(data, out_buffer); \ - break; +#define TYPE_CASE(IN_TYPE) \ + ConvertIntegerNoNullsCast(data, out_buffer); \ + break; if (col->null_count() > 0) { return Status::Invalid("Nulls not supported in integer blocks"); } else { switch (type) { - case Type::UINT8: TYPE_CASE(uint8_t); - case Type::INT8: TYPE_CASE(int8_t); - case Type::UINT16: TYPE_CASE(uint16_t); - case Type::INT16: TYPE_CASE(int16_t); - case Type::UINT32: TYPE_CASE(uint32_t); - case Type::INT32: TYPE_CASE(int32_t); + case Type::UINT8: + TYPE_CASE(uint8_t); + case Type::INT8: + TYPE_CASE(int8_t); + case Type::UINT16: + TYPE_CASE(uint16_t); + case Type::INT16: + TYPE_CASE(int16_t); + case Type::UINT32: + TYPE_CASE(uint32_t); + case Type::INT32: + TYPE_CASE(int32_t); case Type::INT64: ConvertIntegerNoNullsSameType(data, out_buffer); break; @@ -1150,21 +1135,17 @@ class UInt64Block : public PandasBlock { public: using PandasBlock::PandasBlock; - Status Allocate() override { - return AllocateNDArray(NPY_UINT64); - } + Status Allocate() override { return AllocateNDArray(NPY_UINT64); } - Status WriteNext(const std::shared_ptr& col, int placement) override { + Status WriteNext(const std::shared_ptr& col, int64_t placement) override { Type::type type = col->type()->type; - uint64_t* out_buffer = reinterpret_cast(block_data_) + - current_placement_index_ * num_rows_; + uint64_t* out_buffer = + reinterpret_cast(block_data_) + current_placement_index_ * num_rows_; const ChunkedArray& data = *col->data().get(); - if (type != Type::UINT64) { - return Status::NotImplemented(col->type()->ToString()); - } + if (type != Type::UINT64) { return Status::NotImplemented(col->type()->ToString()); } ConvertIntegerNoNullsSameType(data, out_buffer); placement_data_[current_placement_index_++] = placement; @@ -1176,19 +1157,15 @@ class Float32Block : public PandasBlock { public: using PandasBlock::PandasBlock; - Status Allocate() override { - return AllocateNDArray(NPY_FLOAT32); - } + Status Allocate() override { return AllocateNDArray(NPY_FLOAT32); } - Status WriteNext(const std::shared_ptr& col, int placement) override { + Status WriteNext(const std::shared_ptr& col, int64_t placement) override { Type::type type = col->type()->type; - if (type != Type::FLOAT) { - return Status::NotImplemented(col->type()->ToString()); - } + if (type != Type::FLOAT) { return Status::NotImplemented(col->type()->ToString()); } - float* out_buffer = reinterpret_cast(block_data_) + - current_placement_index_ * num_rows_; + float* out_buffer = + reinterpret_cast(block_data_) + current_placement_index_ * num_rows_; ConvertNumericNullable(*col->data().get(), NAN, out_buffer); placement_data_[current_placement_index_++] = placement; @@ -1200,31 +1177,37 @@ class Float64Block : public PandasBlock { public: using PandasBlock::PandasBlock; - Status Allocate() override { - return AllocateNDArray(NPY_FLOAT64); - } + Status Allocate() override { return AllocateNDArray(NPY_FLOAT64); } - Status WriteNext(const std::shared_ptr& col, int placement) override { + Status WriteNext(const std::shared_ptr& col, int64_t placement) override { Type::type type = col->type()->type; - double* out_buffer = reinterpret_cast(block_data_) + - current_placement_index_ * num_rows_; + double* out_buffer = + reinterpret_cast(block_data_) + current_placement_index_ * num_rows_; const ChunkedArray& data = *col->data().get(); -#define INTEGER_CASE(IN_TYPE) \ - ConvertIntegerWithNulls(data, out_buffer); \ - break; +#define INTEGER_CASE(IN_TYPE) \ + ConvertIntegerWithNulls(data, out_buffer); \ + break; switch (type) { - case Type::UINT8: INTEGER_CASE(uint8_t); - case Type::INT8: INTEGER_CASE(int8_t); - case Type::UINT16: INTEGER_CASE(uint16_t); - case Type::INT16: INTEGER_CASE(int16_t); - case Type::UINT32: INTEGER_CASE(uint32_t); - case Type::INT32: INTEGER_CASE(int32_t); - case Type::UINT64: INTEGER_CASE(int32_t); - case Type::INT64: INTEGER_CASE(int32_t); + case Type::UINT8: + INTEGER_CASE(uint8_t); + case Type::INT8: + INTEGER_CASE(int8_t); + case Type::UINT16: + INTEGER_CASE(uint16_t); + case Type::INT16: + INTEGER_CASE(int16_t); + case Type::UINT32: + INTEGER_CASE(uint32_t); + case Type::INT32: + INTEGER_CASE(int32_t); + case Type::UINT64: + INTEGER_CASE(int32_t); + case Type::INT64: + INTEGER_CASE(int32_t); case Type::FLOAT: ConvertNumericNullableCast(data, NAN, out_buffer); break; @@ -1246,19 +1229,15 @@ class BoolBlock : public PandasBlock { public: using PandasBlock::PandasBlock; - Status Allocate() override { - return AllocateNDArray(NPY_BOOL); - } + Status Allocate() override { return AllocateNDArray(NPY_BOOL); } - Status WriteNext(const std::shared_ptr& col, int placement) override { + Status WriteNext(const std::shared_ptr& col, int64_t placement) override { Type::type type = col->type()->type; - if (type != Type::BOOL) { - return Status::NotImplemented(col->type()->ToString()); - } + if (type != Type::BOOL) { return Status::NotImplemented(col->type()->ToString()); } - uint8_t* out_buffer = reinterpret_cast(block_data_) + - current_placement_index_ * num_rows_; + uint8_t* out_buffer = + reinterpret_cast(block_data_) + current_placement_index_ * num_rows_; ConvertBooleanNoNulls(*col->data().get(), out_buffer); placement_data_[current_placement_index_++] = placement; @@ -1280,15 +1259,15 @@ class DatetimeBlock : public PandasBlock { return Status::OK(); } - Status WriteNext(const std::shared_ptr& col, int placement) override { + Status WriteNext(const std::shared_ptr& col, int64_t placement) override { Type::type type = col->type()->type; if (type != Type::TIMESTAMP) { return Status::NotImplemented(col->type()->ToString()); } - int64_t* out_buffer = reinterpret_cast(block_data_) + - current_placement_index_ * num_rows_; + int64_t* out_buffer = + reinterpret_cast(block_data_) + current_placement_index_ * num_rows_; ConvertNumericNullable(*col->data().get(), kPandasTimestampNull, out_buffer); placement_data_[current_placement_index_++] = placement; @@ -1298,7 +1277,6 @@ class DatetimeBlock : public PandasBlock { // class CategoricalBlock : public PandasBlock {}; - Status MakeBlock(PandasBlock::type type, int64_t num_rows, int num_columns, std::shared_ptr* block) { switch (type) { @@ -1339,8 +1317,7 @@ Status MakeBlock(PandasBlock::type type, int64_t num_rows, int num_columns, // * placement arrays as we go class DataFrameBlockCreator { public: - DataFrameBlockCreator(const std::shared_ptr
& table) - : table_(table) {} + DataFrameBlockCreator(const std::shared_ptr
& table) : table_(table) {} Status Convert(int nthreads, PyObject** output) { column_types_.resize(table_->num_columns()); @@ -1361,7 +1338,7 @@ class DataFrameBlockCreator { switch (col->type()->type) { case Type::BOOL: - output_type = col->null_count() > 0? PandasBlock::OBJECT : PandasBlock::BOOL; + output_type = col->null_count() > 0 ? PandasBlock::OBJECT : PandasBlock::BOOL; break; case Type::UINT8: case Type::INT8: @@ -1371,7 +1348,7 @@ class DataFrameBlockCreator { case Type::INT32: case Type::UINT64: case Type::INT64: - output_type = col->null_count() > 0? PandasBlock::DOUBLE : PandasBlock::INT; + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT; break; case Type::FLOAT: output_type = PandasBlock::FLOAT; @@ -1424,9 +1401,7 @@ class DataFrameBlockCreator { PandasBlock::type output_type = column_types_[i]; auto it = blocks_.find(output_type); - if (it == blocks_.end()) { - return Status::KeyError("No block allocated"); - } + if (it == blocks_.end()) { return Status::KeyError("No block allocated"); } RETURN_NOT_OK(it->second->WriteNext(col, i)); } return Status::OK(); @@ -1437,6 +1412,7 @@ class DataFrameBlockCreator { PyObject* result = PyList_New(num_blocks); RETURN_IF_PYERROR(); + int i = 0; for (const auto& it : blocks_) { const std::shared_ptr block = it.second; @@ -1450,9 +1426,7 @@ class DataFrameBlockCreator { PyTuple_SET_ITEM(item, 0, block_arr); PyTuple_SET_ITEM(item, 1, placement_arr); - if (PyList_Append(result, item) < 0) { - RETURN_IF_PYERROR(); - } + if (PyList_SET_ITEM(result, i++, item) < 0) { RETURN_IF_PYERROR(); } } *out = result; return Status::OK(); diff --git a/python/src/pyarrow/helpers.cc b/python/src/pyarrow/helpers.cc index bc71175762b..3f650326e09 100644 --- a/python/src/pyarrow/helpers.cc +++ b/python/src/pyarrow/helpers.cc @@ -23,32 +23,32 @@ using namespace arrow; namespace pyarrow { -#define GET_PRIMITIVE_TYPE(NAME, FACTORY) \ - case Type::NAME: \ - return FACTORY(); \ +#define GET_PRIMITIVE_TYPE(NAME, FACTORY) \ + case Type::NAME: \ + return FACTORY(); \ break; std::shared_ptr GetPrimitiveType(Type::type type) { switch (type) { case Type::NA: return null(); - GET_PRIMITIVE_TYPE(UINT8, uint8); - GET_PRIMITIVE_TYPE(INT8, int8); - GET_PRIMITIVE_TYPE(UINT16, uint16); - GET_PRIMITIVE_TYPE(INT16, int16); - GET_PRIMITIVE_TYPE(UINT32, uint32); - GET_PRIMITIVE_TYPE(INT32, int32); - GET_PRIMITIVE_TYPE(UINT64, uint64); - GET_PRIMITIVE_TYPE(INT64, int64); - GET_PRIMITIVE_TYPE(DATE, date); + GET_PRIMITIVE_TYPE(UINT8, uint8); + GET_PRIMITIVE_TYPE(INT8, int8); + GET_PRIMITIVE_TYPE(UINT16, uint16); + GET_PRIMITIVE_TYPE(INT16, int16); + GET_PRIMITIVE_TYPE(UINT32, uint32); + GET_PRIMITIVE_TYPE(INT32, int32); + GET_PRIMITIVE_TYPE(UINT64, uint64); + GET_PRIMITIVE_TYPE(INT64, int64); + GET_PRIMITIVE_TYPE(DATE, date); case Type::TIMESTAMP: return arrow::timestamp(arrow::TimeUnit::MICRO); break; - GET_PRIMITIVE_TYPE(BOOL, boolean); - GET_PRIMITIVE_TYPE(FLOAT, float32); - GET_PRIMITIVE_TYPE(DOUBLE, float64); - GET_PRIMITIVE_TYPE(BINARY, binary); - GET_PRIMITIVE_TYPE(STRING, utf8); + GET_PRIMITIVE_TYPE(BOOL, boolean); + GET_PRIMITIVE_TYPE(FLOAT, float32); + GET_PRIMITIVE_TYPE(DOUBLE, float64); + GET_PRIMITIVE_TYPE(BINARY, binary); + GET_PRIMITIVE_TYPE(STRING, utf8); default: return nullptr; } From ea83ded3f876d7f24cbdcf1f926a9d183c7ad86c Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 22 Dec 2016 16:04:48 -0500 Subject: [PATCH 6/7] Unit tests pass again Change-Id: I134f1acb3463df47f9cf5d9dadbd4b9145a4f2f6 --- python/src/pyarrow/adapters/pandas.cc | 201 +++++++++++++++----------- 1 file changed, 115 insertions(+), 86 deletions(-) diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 20fed607124..899eb5519d5 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -747,9 +747,11 @@ inline void ConvertNumericNullable(const ChunkedArray& data, T na_value, T* out_ auto prim_arr = static_cast(arr.get()); auto in_values = reinterpret_cast(prim_arr->data()->data()); + const uint8_t* valid_bits = arr->null_bitmap_data(); + if (arr->null_count() > 0) { for (int64_t i = 0; i < arr->length(); ++i) { - *out_values++ = arr->IsNull(i) ? na_value : in_values[i]; + *out_values++ = BitUtil::BitNotSet(valid_bits, i) ? na_value : in_values[i]; } } else { memcpy(out_values, in_values, sizeof(T) * arr->length()); @@ -786,6 +788,20 @@ inline void ConvertDates(const ChunkedArray& data, T na_value, T* out_values) { } } +template +inline void ConvertDatetimeNanos(const ChunkedArray& data, int64_t* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr arr = data.chunk(c); + auto prim_arr = static_cast(arr.get()); + auto in_values = reinterpret_cast(prim_arr->data()->data()); + + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = arr->IsNull(i) ? kPandasTimestampNull + : (static_cast(in_values[i]) * SHIFT); + } + } +} + class ArrowDeserializer { public: ArrowDeserializer(const std::shared_ptr& col, PyObject* py_ref) @@ -994,7 +1010,22 @@ Status ConvertColumnToPandas( class PandasBlock { public: - enum type { OBJECT, INT, UINT64, FLOAT, DOUBLE, BOOL, DATETIME, CATEGORICAL }; + enum type { + OBJECT, + UINT8, + INT8, + UINT16, + INT16, + UINT32, + INT32, + UINT64, + INT64, + FLOAT, + DOUBLE, + BOOL, + DATETIME, + CATEGORICAL + }; PandasBlock(int64_t num_rows, int num_columns) : num_rows_(num_rows), num_columns_(num_columns) {} @@ -1082,77 +1113,40 @@ class ObjectBlock : public PandasBlock { } }; -class Int64Block : public PandasBlock { +template +class IntBlock : public PandasBlock { public: using PandasBlock::PandasBlock; - Status Allocate() override { return AllocateNDArray(NPY_INT64); } - - Status WriteNext(const std::shared_ptr& col, int64_t placement) override { - Type::type type = col->type()->type; - - int64_t* out_buffer = - reinterpret_cast(block_data_) + current_placement_index_ * num_rows_; - - const ChunkedArray& data = *col->data().get(); - -#define TYPE_CASE(IN_TYPE) \ - ConvertIntegerNoNullsCast(data, out_buffer); \ - break; - - if (col->null_count() > 0) { - return Status::Invalid("Nulls not supported in integer blocks"); - } else { - switch (type) { - case Type::UINT8: - TYPE_CASE(uint8_t); - case Type::INT8: - TYPE_CASE(int8_t); - case Type::UINT16: - TYPE_CASE(uint16_t); - case Type::INT16: - TYPE_CASE(int16_t); - case Type::UINT32: - TYPE_CASE(uint32_t); - case Type::INT32: - TYPE_CASE(int32_t); - case Type::INT64: - ConvertIntegerNoNullsSameType(data, out_buffer); - break; - default: - return Status::NotImplemented(col->type()->ToString()); - } - } - -#undef TYPE_CASE - - placement_data_[current_placement_index_++] = placement; - return Status::OK(); + Status Allocate() override { + return AllocateNDArray(arrow_traits::npy_type); } -}; - -class UInt64Block : public PandasBlock { - public: - using PandasBlock::PandasBlock; - - Status Allocate() override { return AllocateNDArray(NPY_UINT64); } Status WriteNext(const std::shared_ptr& col, int64_t placement) override { Type::type type = col->type()->type; - uint64_t* out_buffer = - reinterpret_cast(block_data_) + current_placement_index_ * num_rows_; + C_TYPE* out_buffer = + reinterpret_cast(block_data_) + current_placement_index_ * num_rows_; const ChunkedArray& data = *col->data().get(); - if (type != Type::UINT64) { return Status::NotImplemented(col->type()->ToString()); } + if (type != ARROW_TYPE) { return Status::NotImplemented(col->type()->ToString()); } - ConvertIntegerNoNullsSameType(data, out_buffer); + ConvertIntegerNoNullsSameType(data, out_buffer); placement_data_[current_placement_index_++] = placement; return Status::OK(); } }; +using UInt8Block = IntBlock; +using Int8Block = IntBlock; +using UInt16Block = IntBlock; +using Int16Block = IntBlock; +using UInt32Block = IntBlock; +using Int32Block = IntBlock; +using UInt64Block = IntBlock; +using Int64Block = IntBlock; + class Float32Block : public PandasBlock { public: using PandasBlock::PandasBlock; @@ -1205,9 +1199,9 @@ class Float64Block : public PandasBlock { case Type::INT32: INTEGER_CASE(int32_t); case Type::UINT64: - INTEGER_CASE(int32_t); + INTEGER_CASE(uint64_t); case Type::INT64: - INTEGER_CASE(int32_t); + INTEGER_CASE(int64_t); case Type::FLOAT: ConvertNumericNullableCast(data, NAN, out_buffer); break; @@ -1262,14 +1256,33 @@ class DatetimeBlock : public PandasBlock { Status WriteNext(const std::shared_ptr& col, int64_t placement) override { Type::type type = col->type()->type; - if (type != Type::TIMESTAMP) { - return Status::NotImplemented(col->type()->ToString()); - } - int64_t* out_buffer = reinterpret_cast(block_data_) + current_placement_index_ * num_rows_; - ConvertNumericNullable(*col->data().get(), kPandasTimestampNull, out_buffer); + const ChunkedArray& data = *col.get()->data(); + + if (type == Type::DATE) { + // DateType is millisecond timestamp stored as int64_t + // TODO(wesm): Do we want to make sure to zero out the milliseconds? + ConvertDatetimeNanos(data, out_buffer); + } else if (type == Type::TIMESTAMP) { + auto ts_type = static_cast(col->type().get()); + + if (ts_type->unit == arrow::TimeUnit::NANO) { + ConvertNumericNullable(data, kPandasTimestampNull, out_buffer); + } else if (ts_type->unit == arrow::TimeUnit::MICRO) { + ConvertDatetimeNanos(data, out_buffer); + } else if (ts_type->unit == arrow::TimeUnit::MILLI) { + ConvertDatetimeNanos(data, out_buffer); + } else if (ts_type->unit == arrow::TimeUnit::SECOND) { + ConvertDatetimeNanos(data, out_buffer); + } else { + return Status::NotImplemented("Unsupported time unit"); + } + } else { + return Status::NotImplemented(col->type()->ToString()); + } + placement_data_[current_placement_index_++] = placement; return Status::OK(); } @@ -1279,32 +1292,31 @@ class DatetimeBlock : public PandasBlock { Status MakeBlock(PandasBlock::type type, int64_t num_rows, int num_columns, std::shared_ptr* block) { +#define BLOCK_CASE(NAME, TYPE) \ + case PandasBlock::NAME: \ + *block = std::make_shared(num_rows, num_columns); \ + break; + switch (type) { - case PandasBlock::OBJECT: - *block = std::make_shared(num_rows, num_columns); - break; - case PandasBlock::INT: - *block = std::make_shared(num_rows, num_columns); - break; - case PandasBlock::UINT64: - *block = std::make_shared(num_rows, num_columns); - break; - case PandasBlock::FLOAT: - *block = std::make_shared(num_rows, num_columns); - break; - case PandasBlock::DOUBLE: - *block = std::make_shared(num_rows, num_columns); - break; - case PandasBlock::BOOL: - *block = std::make_shared(num_rows, num_columns); - break; - case PandasBlock::DATETIME: - *block = std::make_shared(num_rows, num_columns); - break; + BLOCK_CASE(OBJECT, ObjectBlock); + BLOCK_CASE(UINT8, UInt8Block); + BLOCK_CASE(INT8, Int8Block); + BLOCK_CASE(UINT16, UInt16Block); + BLOCK_CASE(INT16, Int16Block); + BLOCK_CASE(UINT32, UInt32Block); + BLOCK_CASE(INT32, Int32Block); + BLOCK_CASE(UINT64, UInt64Block); + BLOCK_CASE(INT64, Int64Block); + BLOCK_CASE(FLOAT, Float32Block); + BLOCK_CASE(DOUBLE, Float64Block); + BLOCK_CASE(BOOL, BoolBlock); + BLOCK_CASE(DATETIME, DatetimeBlock); case PandasBlock::CATEGORICAL: return Status::NotImplemented("categorical"); } +#undef BLOCK_CASE + return (*block)->Allocate(); } @@ -1341,14 +1353,28 @@ class DataFrameBlockCreator { output_type = col->null_count() > 0 ? PandasBlock::OBJECT : PandasBlock::BOOL; break; case Type::UINT8: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT8; + break; case Type::INT8: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT8; + break; case Type::UINT16: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT16; + break; case Type::INT16: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT16; + break; case Type::UINT32: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT32; + break; case Type::INT32: - case Type::UINT64: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT32; + break; case Type::INT64: - output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT; + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT64; + break; + case Type::UINT64: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT64; break; case Type::FLOAT: output_type = PandasBlock::FLOAT; @@ -1360,6 +1386,9 @@ class DataFrameBlockCreator { case Type::BINARY: output_type = PandasBlock::OBJECT; break; + case Type::DATE: + output_type = PandasBlock::DATETIME; + break; case Type::TIMESTAMP: output_type = PandasBlock::DATETIME; break; From f22e1b5aa0c31a80f69517a13ecc6c59c4d8c056 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 23 Dec 2016 09:37:36 -0500 Subject: [PATCH 7/7] Remove unneeded import Change-Id: I867430892cdd3903a869c624986a5880266f1b7c --- python/pyarrow/table.pyx | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 74e035a6d62..93755578884 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -623,7 +623,6 @@ cdef class Table: shared_ptr[CColumn] col Column column - from pandas.core.internals import make_block, BlockManager import pandas as pd if block_based: