From 397b8142e5e32aadbb7c5c9c2e525c8bfbffef0c Mon Sep 17 00:00:00 2001 From: lalalazy12 <434827371@qq.com> Date: Mon, 27 Sep 2021 15:18:35 +0800 Subject: [PATCH] Data transfer with arrow (#4) * Data transfer with arrow * Velox to Arrow * Fix a small bug in Type.h * Clear comments and Create DataUtil class Co-authored-by: lalalazy12 --- velox/external/arrow/abi.h | 103 ++++++++ velox/type/Type.h | 69 ++++++ velox/vector/CMakeLists.txt | 3 +- velox/vector/DataExchangeWithArrow.cpp | 318 +++++++++++++++++++++++++ velox/vector/DataExchangeWithArrow.h | 58 +++++ velox/vector/tests/CMakeLists.txt | 3 +- velox/vector/tests/VectorTest.cpp | 28 +++ 7 files changed, 580 insertions(+), 2 deletions(-) create mode 100644 velox/external/arrow/abi.h create mode 100644 velox/vector/DataExchangeWithArrow.cpp create mode 100644 velox/vector/DataExchangeWithArrow.h diff --git a/velox/external/arrow/abi.h b/velox/external/arrow/abi.h new file mode 100644 index 000000000000..a78170dbdbcb --- /dev/null +++ b/velox/external/arrow/abi.h @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +struct ArrowSchema { + // Array type description + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct ArrowSchema** children; + struct ArrowSchema* dictionary; + + // Release callback + void (*release)(struct ArrowSchema*); + // Opaque producer-specific data + void* private_data; +}; + +struct ArrowArray { + // Array data description + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct ArrowArray** children; + struct ArrowArray* dictionary; + + // Release callback + void (*release)(struct ArrowArray*); + // Opaque producer-specific data + void* private_data; +}; + +// EXPERIMENTAL: C stream interface + +struct ArrowArrayStream { + // Callback to get the stream type + // (will be the same for all arrays in the stream). + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowSchema must be released independently from the stream. + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + + // Callback to get the next array + // (if no error and the array is released, the stream has ended) + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowArray must be released independently from the stream. + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + + // Callback to get optional detailed error information. + // This must only be called if the last stream operation failed + // with a non-0 return code. + // + // Return value: pointer to a null-terminated character array describing + // the last error, or NULL if no description is available. + // + // The returned pointer is only valid until the next operation on this stream + // (including release). + const char* (*get_last_error)(struct ArrowArrayStream*); + + // Release callback: release the stream's own resources. + // Note that arrays returned by `get_next` must be individually released. + void (*release)(struct ArrowArrayStream*); + + // Opaque producer-specific data + void* private_data; +}; + +#ifdef __cplusplus +} +#endif diff --git a/velox/type/Type.h b/velox/type/Type.h index 25f44cbcc3b2..eee11ee0f7c8 100644 --- a/velox/type/Type.h +++ b/velox/type/Type.h @@ -1451,6 +1451,75 @@ bool typeExists(const std::string& name); /// child types. TypePtr getType(const std::string& name, std::vector childTypes); +#define VELOX_DYNAMIC_ARROW_TYPE_DISPATCH(TEMPLATE_FUNC, typeKind, ...) \ + VELOX_DYNAMIC_ARROW_TYPE_DISPATCH_IMPL(TEMPLATE_FUNC, , typeKind, __VA_ARGS__) + +#define VELOX_DYNAMIC_ARROW_TYPE_DISPATCH_IMPL(PREFIX, SUFFIX, typeKind, ...) \ + [&]() { \ + switch (typeKind) { \ + case 'b': { \ + return PREFIX<::facebook::velox::TypeKind::BOOLEAN> SUFFIX( \ + __VA_ARGS__); \ + } \ + case 'i': { \ + return PREFIX<::facebook::velox::TypeKind::INTEGER> SUFFIX( \ + __VA_ARGS__); \ + } \ + case 'c': { \ + return PREFIX<::facebook::velox::TypeKind::TINYINT> SUFFIX( \ + __VA_ARGS__); \ + } \ + case 's': { \ + return PREFIX<::facebook::velox::TypeKind::SMALLINT> SUFFIX( \ + __VA_ARGS__); \ + } \ + case 'l': { \ + return PREFIX<::facebook::velox::TypeKind::BIGINT> SUFFIX( \ + __VA_ARGS__); \ + } \ + case 'f': { \ + return PREFIX<::facebook::velox::TypeKind::REAL> SUFFIX(__VA_ARGS__); \ + } \ + case 'g': { \ + return PREFIX<::facebook::velox::TypeKind::DOUBLE> SUFFIX( \ + __VA_ARGS__); \ + } \ + default: \ + VELOX_FAIL("not a known type kind: {}", mapTypeKindToName(typeKind)); \ + } \ + }() + + +/***TODO: \ + case ::facebook::velox::TypeKind::VARCHAR: { \ + return PREFIX<::facebook::velox::TypeKind::VARCHAR> SUFFIX( \ + __VA_ARGS__); \ + } \ + case ::facebook::velox::TypeKind::VARBINARY: { \ + return PREFIX<::facebook::velox::TypeKind::VARBINARY> SUFFIX( \ + __VA_ARGS__); \ + } \ + case ::facebook::velox::TypeKind::TIMESTAMP: { \ + return PREFIX<::facebook::velox::TypeKind::TIMESTAMP> SUFFIX( \ + __VA_ARGS__); \ + } \ + case ::facebook::velox::TypeKind::ARRAY: { \ + return PREFIX<::facebook::velox::TypeKind::ARRAY> SUFFIX(__VA_ARGS__); \ + } \ + case ::facebook::velox::TypeKind::MAP: { \ + return PREFIX<::facebook::velox::TypeKind::MAP> SUFFIX(__VA_ARGS__); \ + } \ + case ::facebook::velox::TypeKind::ROW: { \ + return PREFIX<::facebook::velox::TypeKind::ROW> SUFFIX(__VA_ARGS__); \ + } + ***/ \ + + + + + + + } // namespace facebook::velox namespace folly { diff --git a/velox/vector/CMakeLists.txt b/velox/vector/CMakeLists.txt index 98c4267e315e..413a29b29f85 100644 --- a/velox/vector/CMakeLists.txt +++ b/velox/vector/CMakeLists.txt @@ -23,7 +23,8 @@ add_library( SelectivityVector.cpp SimpleVector.cpp VectorStream.cpp - VectorEncoding.cpp) + VectorEncoding.cpp + DataExchangeWithArrow.cpp) target_link_libraries(velox_vector velox_memory velox_type velox_encode) diff --git a/velox/vector/DataExchangeWithArrow.cpp b/velox/vector/DataExchangeWithArrow.cpp new file mode 100644 index 000000000000..43a917bc8348 --- /dev/null +++ b/velox/vector/DataExchangeWithArrow.cpp @@ -0,0 +1,318 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** + * + * Unitills for zero-copy data transform with Arrow::array + * + ***/ +#include "velox/vector/DataExchangeWithArrow.h" +#include "velox/vector/FlatVector.h" + +// TODEL: +#include + +namespace facebook { +namespace velox { + +// Reference:https://arrow.apache.org/docs/format/CDataInterface.html#exporting-a-struct-float32-utf8-array +std::pair DataUtil::VeloxToArrow( + RowVectorPtr& row_vector_ptr) { + int n_p = row_vector_ptr.use_count(); + // for now we only focus on primitive type + ArrowArray* array = new ArrowArray; + ArrowSchema* schema = new ArrowSchema; + // Note: Make sure construct schema first and then array. Since after + // export_from_velox(), the object could not be accessed. + export_schema_from_Velox_type(row_vector_ptr, schema); + export_from_velox(row_vector_ptr, array); + return std::make_pair(array, schema); +} + +// Callback for ArrowArray to delete children ArrowArray, and free private_data +void DataUtil::release_malloced_array(struct ArrowArray* array) { + int i; + // Free children, call children's release + for (i = 0; i < array->n_children; ++i) { + struct ArrowArray* child = array->children[i]; + if (child->release != NULL) { + child->release(child); + } + } + std::free(array->children); + //Free original phisical buffer. TODO:test + delete array->private_data; + // Mark released + array->release = NULL; +} + +// Take RowVectorPtr as input, construct ArrowArray as output +void DataUtil::export_from_velox( + const RowVectorPtr& row_vector_ptr, + struct ArrowArray* array) { + struct ArrowArray* child; + // + // Initialize parent array + // + size_t children_size = row_vector_ptr->childrenSize(); + *array = (struct ArrowArray){ + // Data description + .length = row_vector_ptr->size(), // VERIFY: number of items? what if + // child has different nitems? + .null_count = -1, // TODO: for parent, there is no null + .offset = 0, + .n_buffers = 1, // VERIFY: parent has one buffer + .n_children = static_cast(children_size), + .dictionary = NULL, + // Bookkeeping + .release = &release_malloced_array}; + array->buffers = (const void**)malloc(sizeof(void*) * array->n_buffers); + array->buffers[0] = NULL; // no nulls, null bitmap can be omitted + // Allocate list of children arrays + array->children = + (ArrowArray**)malloc(sizeof(struct ArrowArray*) * array->n_children); + // + // Initialize child array + // + for (int i = 0; i < children_size; i++) { + VectorPtr velox_child = row_vector_ptr->childAt(i); + int p_num = velox_child.use_count(); + + child = array->children[i] = (ArrowArray*)malloc(sizeof(struct ArrowArray)); + *child = (struct ArrowArray){ + // Data description + .length = velox_child->size(), + .null_count = velox_child->getNullCount().value(), + .offset = 0, + .n_buffers = + 2, // VERIFY: only 2 buffers no matter how large a vector is? + .n_children = 0, + .children = NULL, + .dictionary = NULL, // TODO: for now only focus on primitive data + // Bookkeeping + .release = &release_malloced_array}; + + // Assign buffer ptr (Velox::Vector.data_) to buffer ptr + // of ArrowArray (arrow::array.buffers). + child->buffers = (const void**)malloc(sizeof(void*) * array->n_buffers); + // buffer[0] store is_null buffer, Note: check whether contains null or not + if (velox_child->mayHaveNulls()) { + BufferPtr velox_SmartNullsBufferPtr = + velox_child->mutableNulls(velox_child->size()); + child->buffers[0] = velox_SmartNullsBufferPtr->asMutable(); + } else { + child->buffers[0] = nullptr; + } + // buffer[1] store value buffer + // isReusableFlatVecto() Returns true if 'velox_child' vector is a unique + // reference to a flat velox_child and nulls/values are uniquely referenced. + int p_count = velox_child.use_count(); + // TODO: + // if (BaseVector::isReusableFlatVector(velox_child)){ + switch (velox_child->typeKind()) { + case TypeKind::BOOLEAN: + // asMutable() return phisical buffer pointer, return type: T* + child->buffers[1] = + velox_child + ->asFlatVector::NativeType>() + ->values() + ->asMutable::NativeType>(); + break; + case TypeKind::TINYINT: + child->buffers[1] = + velox_child + ->asFlatVector::NativeType>() + ->values() + ->asMutable::NativeType>(); + break; + case TypeKind::SMALLINT: + child->buffers[1] = + velox_child + ->asFlatVector::NativeType>() + ->values() + ->asMutable::NativeType>(); + break; + case TypeKind::INTEGER: + child->buffers[1] = + velox_child + ->asFlatVector::NativeType>() + ->values() + ->asMutable::NativeType>(); + break; + case TypeKind::BIGINT: + child->buffers[1] = + velox_child + ->asFlatVector::NativeType>() + ->values() + ->asMutable::NativeType>(); + break; + case TypeKind::REAL: + child->buffers[1] = + velox_child->asFlatVector::NativeType>() + ->values() + ->asMutable::NativeType>(); + break; + case TypeKind::DOUBLE: + child->buffers[1] = + velox_child + ->asFlatVector::NativeType>() + ->values() + ->asMutable::NativeType>(); + break; + // Below type may not work + case TypeKind::VARCHAR: + child->buffers[1] = + velox_child + ->asFlatVector::NativeType>() + ->values() + ->asMutable::NativeType>(); + break; + case TypeKind::VARBINARY: + child->buffers[1] = + velox_child + ->asFlatVector::NativeType>() + ->values() + ->asMutable::NativeType>(); + break; + case TypeKind::TIMESTAMP: + child->buffers[1] = + velox_child + ->asFlatVector::NativeType>() + ->values() + ->asMutable::NativeType>(); + break; + // TODO + // default: + // throw UnsupportedException(); + } + // TODO: delete velox_buffer ptr object without free physical buffer + // Thus make sure call export_from_Velox_type before + } +} + +// Release ArrowSchema recursively +void DataUtil::release_malloced_schema(struct ArrowSchema* schema) { + int i; + // call release for each child + for (i = 0; i < schema->n_children; ++i) { + struct ArrowSchema* child = schema->children[i]; + if (child->release != NULL) { + child->release(child); + } + } + std::free(schema->children); + // Mark released + schema->release = NULL; +} + +// Take RowVectorPtr as input, read shcema info and construct ArrowSchema +void DataUtil::export_schema_from_Velox_type( + const RowVectorPtr& row_vector_ptr, + struct ArrowSchema* schema) { + struct ArrowSchema* child; + // + // Initialize parent type + // + auto rowType = row_vector_ptr->type()->asRow(); + size_t children_size = row_vector_ptr->childrenSize(); + *schema = + (struct ArrowSchema){// Type description + .format = "+s", + .name = "", + .metadata = NULL, + .flags = 0, // TODO + .n_children = static_cast(children_size), + .dictionary = NULL, + // Bookkeeping + .release = &release_malloced_schema}; + // Allocate list of children types + schema->children = + (ArrowSchema**)malloc(sizeof(struct ArrowSchema*) * schema->n_children); + // + // Initialize each child type + // + for (int i = 0; i < schema->n_children; i++) { + child = schema->children[i] = + (ArrowSchema*)malloc(sizeof(struct ArrowSchema)); + *child = (struct ArrowSchema){ + // Type description + .format = SwitchFormat(rowType.childAt(i)), + // Verify: in Velox name is string, but in arrowschema name is char + .name = rowType.nameOf(i).c_str(), + .metadata = NULL, + .flags = ARROW_FLAG_NULLABLE, // TODO + .n_children = 0, + .children = NULL, + .dictionary = NULL, + // Bookkeeping + .release = &release_malloced_schema}; + } +} + + +// TODO: may change to macro +const char* DataUtil::SwitchFormat(const std::shared_ptr& type) { + char format; + switch (type->kind()) { + case TypeKind::BOOLEAN: + format = 'b'; + break; + case TypeKind::TINYINT: + format = 'c'; + break; + case TypeKind::SMALLINT: + format = 's'; + break; + case TypeKind::INTEGER: + format = 'i'; + break; + case TypeKind::BIGINT: + format = 'l'; + break; + case TypeKind::REAL: + format = 'f'; + break; + case TypeKind::DOUBLE: + format = 'g'; + break; + } + const char* format_ptr = &format; + return format_ptr; +} + +// Work in progress +RowVectorPtr& DataUtil::ArrowToVelox(std::pair arrowPair) { + ArrowArray* arrow_array = arrowPair.first; + ArrowSchema* arrow_schema = arrowPair.second; + // No consider nested  + std::vector children(arrow_array->n_children); + for (int i = 0; i < arrow_array->n_children; i++) { + ArrowArray* arrow_array_child = arrow_array->children[i]; + ArrowSchema* arrow_schema_child = arrow_schema->children[i]; + children[i] = VELOX_DYNAMIC_ARROW_TYPE_DISPATCH( +      ArrowArrayToVector,  //template function +      arrow_schema_child->format, +     ); + } +} +// ref imlpliment: createScalar +template +static VectorPtr DataUtil::ArrowArrayToVector() { + +} + +} // namespace velox +} // namespace facebook \ No newline at end of file diff --git a/velox/vector/DataExchangeWithArrow.h b/velox/vector/DataExchangeWithArrow.h new file mode 100644 index 000000000000..2d3a897adfc7 --- /dev/null +++ b/velox/vector/DataExchangeWithArrow.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/external/arrow/abi.h" +#include "velox/type/Type.h" +#include "velox/vector/BaseVector.h" +#include "velox/vector/ComplexVector.h" + +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +namespace facebook::velox { + +/** + * Utill Class for Data sharing with Arrow + */ +class DataUtill { + public: + // Zero-copy exchange data from Velox to Arrow + // Input: velox::RowVectorPtr + // Output: std::pair + static std::pair VeloxToArrow( + const RowVectorPtr& row_vector_ptr); + + // Zero-copy exchange data from Arrow to Velox + // Input: std::pair + // Output: velox::RowVectorPtr + static velox::RowVectorPtr ArrowToVelox( + ArrowArray* arrow_array, + ArrowSchema schema); + + private: + // Util functions for VeloxToArrow + static const char* SwitchFormat(const std::shared_ptr& type); + static void release_malloced_array(struct ArrowArray* array); + static void export_from_velox( + const RowVectorPtr& row_vector_ptr, + struct ArrowArray* array); + static void release_malloced_schema(struct ArrowSchema* schema); + static void export_schema_from_Velox_type( + const RowVectorPtr& row_vector_ptr, + struct ArrowSchema* schema); +}; + +} // namespace facebook::velox diff --git a/velox/vector/tests/CMakeLists.txt b/velox/vector/tests/CMakeLists.txt index e9bb183cd6ac..47e7b4ac068a 100644 --- a/velox/vector/tests/CMakeLists.txt +++ b/velox/vector/tests/CMakeLists.txt @@ -17,7 +17,8 @@ target_link_libraries(velox_vector_test_lib velox_vector) add_executable( velox_vector_test VectorMakerTest.cpp VectorTest.cpp DecodedVectorTest.cpp - SelectivityVectorTest.cpp EnsureWritableVectorTest.cpp) + SelectivityVectorTest.cpp EnsureWritableVectorTest.cpp + ) add_test(velox_vector_test velox_vector_test) diff --git a/velox/vector/tests/VectorTest.cpp b/velox/vector/tests/VectorTest.cpp index 9255877ff163..13dc53e8299c 100644 --- a/velox/vector/tests/VectorTest.cpp +++ b/velox/vector/tests/VectorTest.cpp @@ -24,6 +24,7 @@ #include "velox/vector/FlatVector.h" #include "velox/vector/LazyVector.h" #include "velox/vector/tests/VectorMaker.h" +#include "velox/vector/DataExchangeWithArrow.h" using namespace facebook::velox; using facebook::velox::ComplexType; @@ -159,6 +160,22 @@ class VectorTest : public testing::Test { size * sizeof(T)); } + RowVectorPtr createSimpleRow(int32_t numRows, bool withNulls) { + auto row_Type = + ROW({"c0_bigint", "c1_real"}, {BIGINT(), REAL()}); + + std::vector row_vector = { + createScalar(BIGINT(), numRows, withNulls), + createScalar(REAL(), numRows, withNulls)}; + return std::make_shared( + pool_.get(), + row_Type, + BufferPtr(nullptr), + numRows, + std::move(row_vector), + 0 /*nullCount*/); + } + VectorPtr createRow(int32_t numRows, bool withNulls) { auto childType = ROW({"child_bigint", "child_string"}, {BIGINT(), VARCHAR()}); @@ -1210,3 +1227,14 @@ TEST_F(VectorTest, valueHook) { lazy->load(rows, &hook); EXPECT_EQ(hook.errors(), 0); } + + + +TEST_F(VectorTest, RowVector2ArrowArray){ + RowVectorPtr baseRow = createSimpleRow(vectorSize_, true); + int i = baseRow.use_count(); + std::pair arrowPair = VeloxToArrow(baseRow); + ArrowArray* arrow_array = arrowPair.first; + ArrowSchema* arrow_schema = arrowPair.second; + +}