diff --git a/cpp/cmake/apache-arrow.cmake b/cpp/cmake/apache-arrow.cmake index ccc44d8b3..377122d70 100644 --- a/cpp/cmake/apache-arrow.cmake +++ b/cpp/cmake/apache-arrow.cmake @@ -34,6 +34,13 @@ function(build_arrow) endif () find_package(Threads) + find_package(Arrow QUIET) + set(ARROW_VERSION_TO_BUILD "10.0.1" CACHE INTERNAL "arrow version") + if (Arrow_FOUND) # arrow is installed, build the same version as the installed one + message(STATUS "Found Arrow installed, align to version: ${Arrow_VERSION}") + set(ARROW_VERSION_TO_BUILD "${Arrow_VERSION}" CACHE INTERNAL "arrow version") + endif () + # If Arrow needs to be built, the default location will be within the build tree. set(GAR_ARROW_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/arrow_ep-prefix") @@ -84,10 +91,11 @@ function(build_arrow) set(GAR_ARROW_INCLUDE_DIR "${GAR_ARROW_PREFIX}/include" CACHE INTERNAL "arrow include directory") set(GAR_ARROW_BUILD_BYPRODUCTS "${GAR_ARROW_STATIC_LIB}" "${GAR_PARQUET_STATIC_LIB}") + set(GAR_ARROW_SOURCE_FILE "https://www.apache.org/dyn/closer.lua?action=download&filename=arrow/arrow-${ARROW_VERSION_TO_BUILD}/apache-arrow-${ARROW_VERSION_TO_BUILD}.tar.gz") include(ExternalProject) externalproject_add(arrow_ep - URL https://www.apache.org/dyn/closer.lua?action=download&filename=arrow/arrow-10.0.1/apache-arrow-10.0.1.tar.gz + URL "${GAR_ARROW_SOURCE_FILE}" SOURCE_SUBDIR cpp BINARY_DIR "${GAR_ARROW_BINARY_DIR}" CMAKE_ARGS "${GAR_ARROW_CMAKE_ARGS}" diff --git a/cpp/src/arrow_chunk_writer.cc b/cpp/src/arrow_chunk_writer.cc index 0facfe1c1..97ca134c8 100644 --- a/cpp/src/arrow_chunk_writer.cc +++ b/cpp/src/arrow_chunk_writer.cc @@ -17,7 +17,11 @@ limitations under the License. #include "arrow/api.h" #include "arrow/compute/api.h" +#if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000 +#include "arrow/acero/exec_plan.h" +#else #include "arrow/compute/exec/exec_plan.h" +#endif #include "arrow/dataset/dataset.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/file_parquet.h" @@ -29,6 +33,12 @@ limitations under the License. namespace GAR_NAMESPACE_INTERNAL { // common methods +#if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000 +namespace arrow_acero_namespace = arrow::acero; +#else +namespace arrow_acero_namespace = arrow::compute; +#endif + #if defined(ARROW_VERSION) && ARROW_VERSION >= 10000000 using AsyncGeneratorType = arrow::AsyncGenerator>; @@ -47,17 +57,21 @@ using AsyncGeneratorType = */ Result> ExecutePlanAndCollectAsTable( const arrow::compute::ExecContext& exec_context, - std::shared_ptr plan, + std::shared_ptr plan, std::shared_ptr schema, AsyncGeneratorType sink_gen) { // translate sink_gen (async) to sink_reader (sync) std::shared_ptr sink_reader = - arrow::compute::MakeGeneratorReader(schema, std::move(sink_gen), - exec_context.memory_pool()); + arrow_acero_namespace::MakeGeneratorReader(schema, std::move(sink_gen), + exec_context.memory_pool()); // validate the ExecPlan RETURN_NOT_ARROW_OK(plan->Validate()); // start the ExecPlan +#if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000 + plan->StartProducing(); // arrow 12.0.0 or later return void, not Status +#else RETURN_NOT_ARROW_OK(plan->StartProducing()); +#endif // collect sink_reader into a Table std::shared_ptr response_table; @@ -643,17 +657,17 @@ Result> EdgeChunkWriter::sortTable( const std::shared_ptr& input_table, const std::string& column_name) { auto exec_context = arrow::compute::default_exec_context(); - auto plan = arrow::compute::ExecPlan::Make(exec_context).ValueOrDie(); + auto plan = arrow_acero_namespace::ExecPlan::Make(exec_context).ValueOrDie(); int max_batch_size = 2; - auto table_source_options = - arrow::compute::TableSourceNodeOptions{input_table, max_batch_size}; - auto source = arrow::compute::MakeExecNode("table_source", plan.get(), {}, - table_source_options) + auto table_source_options = arrow_acero_namespace::TableSourceNodeOptions{ + input_table, max_batch_size}; + auto source = arrow_acero_namespace::MakeExecNode("table_source", plan.get(), + {}, table_source_options) .ValueOrDie(); AsyncGeneratorType sink_gen; - if (!arrow::compute::MakeExecNode( + if (!arrow_acero_namespace::MakeExecNode( "order_by_sink", plan.get(), {source}, - arrow::compute::OrderBySinkNodeOptions{ + arrow_acero_namespace::OrderBySinkNodeOptions{ arrow::compute::SortOptions{{arrow::compute::SortKey{ column_name, arrow::compute::SortOrder::Ascending}}}, &sink_gen})