Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Align arrow version to system if arrow installed #162

Merged
merged 3 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cpp/cmake/apache-arrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ function(build_arrow)
endif ()

find_package(Threads)
find_package(Arrow QUIET)
set(ARROW_VERSION_TO_BUILD "10.0.1" CACHE INTERNAL "arrow version")
if (Arrow_FOUND) # arrow is installed, build the same version as the installed one
message(STATUS "Found Arrow installed, align to version: ${Arrow_VERSION}")
set(ARROW_VERSION_TO_BUILD "${Arrow_VERSION}" CACHE INTERNAL "arrow version")
endif ()

# If Arrow needs to be built, the default location will be within the build tree.
set(GAR_ARROW_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/arrow_ep-prefix")

Expand Down Expand Up @@ -84,10 +91,11 @@ function(build_arrow)

set(GAR_ARROW_INCLUDE_DIR "${GAR_ARROW_PREFIX}/include" CACHE INTERNAL "arrow include directory")
set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}")
set(GAR_ARROW_SOURCE_FILE "https://www.apache.org/dyn/closer.lua?action=download&filename=arrow/arrow-${ARROW_VERSION_TO_BUILD}/apache-arrow-${ARROW_VERSION_TO_BUILD}.tar.gz")

include(ExternalProject)
externalproject_add(arrow_ep
URL https://www.apache.org/dyn/closer.lua?action=download&filename=arrow/arrow-10.0.1/apache-arrow-10.0.1.tar.gz
URL "${GAR_ARROW_SOURCE_FILE}"
SOURCE_SUBDIR cpp
BINARY_DIR "${GAR_ARROW_BINARY_DIR}"
CMAKE_ARGS "${GAR_ARROW_CMAKE_ARGS}"
Expand Down
34 changes: 24 additions & 10 deletions cpp/src/arrow_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ limitations under the License.

#include "arrow/api.h"
#include "arrow/compute/api.h"
#if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
#include "arrow/acero/exec_plan.h"
#else
#include "arrow/compute/exec/exec_plan.h"
#endif
#include "arrow/dataset/dataset.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/file_parquet.h"
Expand All @@ -29,6 +33,12 @@ limitations under the License.
namespace GAR_NAMESPACE_INTERNAL {
// common methods

#if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
namespace arrow_acero_namespace = arrow::acero;
#else
namespace arrow_acero_namespace = arrow::compute;
#endif

#if defined(ARROW_VERSION) && ARROW_VERSION >= 10000000
using AsyncGeneratorType =
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>>;
Expand All @@ -47,17 +57,21 @@ using AsyncGeneratorType =
*/
Result<std::shared_ptr<arrow::Table>> ExecutePlanAndCollectAsTable(
const arrow::compute::ExecContext& exec_context,
std::shared_ptr<arrow::compute::ExecPlan> plan,
std::shared_ptr<arrow_acero_namespace::ExecPlan> plan,
std::shared_ptr<arrow::Schema> schema, AsyncGeneratorType sink_gen) {
// translate sink_gen (async) to sink_reader (sync)
std::shared_ptr<arrow::RecordBatchReader> sink_reader =
arrow::compute::MakeGeneratorReader(schema, std::move(sink_gen),
exec_context.memory_pool());
arrow_acero_namespace::MakeGeneratorReader(schema, std::move(sink_gen),
exec_context.memory_pool());

// validate the ExecPlan
RETURN_NOT_ARROW_OK(plan->Validate());
// start the ExecPlan
#if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
plan->StartProducing(); // arrow 12.0.0 or later return void, not Status
#else
RETURN_NOT_ARROW_OK(plan->StartProducing());
#endif

// collect sink_reader into a Table
std::shared_ptr<arrow::Table> response_table;
Expand Down Expand Up @@ -643,17 +657,17 @@ Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::sortTable(
const std::shared_ptr<arrow::Table>& input_table,
const std::string& column_name) {
auto exec_context = arrow::compute::default_exec_context();
auto plan = arrow::compute::ExecPlan::Make(exec_context).ValueOrDie();
auto plan = arrow_acero_namespace::ExecPlan::Make(exec_context).ValueOrDie();
int max_batch_size = 2;
auto table_source_options =
arrow::compute::TableSourceNodeOptions{input_table, max_batch_size};
auto source = arrow::compute::MakeExecNode("table_source", plan.get(), {},
table_source_options)
auto table_source_options = arrow_acero_namespace::TableSourceNodeOptions{
input_table, max_batch_size};
auto source = arrow_acero_namespace::MakeExecNode("table_source", plan.get(),
{}, table_source_options)
.ValueOrDie();
AsyncGeneratorType sink_gen;
if (!arrow::compute::MakeExecNode(
if (!arrow_acero_namespace::MakeExecNode(
"order_by_sink", plan.get(), {source},
arrow::compute::OrderBySinkNodeOptions{
arrow_acero_namespace::OrderBySinkNodeOptions{
arrow::compute::SortOptions{{arrow::compute::SortKey{
column_name, arrow::compute::SortOrder::Ascending}}},
&sink_gen})
Expand Down