@@ -17,7 +17,11 @@ limitations under the License.
17
17
18
18
#include " arrow/api.h"
19
19
#include " arrow/compute/api.h"
20
+ #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
21
+ #include " arrow/acero/exec_plan.h"
22
+ #else
20
23
#include " arrow/compute/exec/exec_plan.h"
24
+ #endif
21
25
#include " arrow/dataset/dataset.h"
22
26
#include " arrow/dataset/file_base.h"
23
27
#include " arrow/dataset/file_parquet.h"
@@ -29,6 +33,12 @@ limitations under the License.
29
33
namespace GAR_NAMESPACE_INTERNAL {
30
34
// common methods
31
35
36
+ #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
37
+ namespace arrow_acero_namespace = arrow::acero;
38
+ #else
39
+ namespace arrow_acero_namespace = arrow::compute;
40
+ #endif
41
+
32
42
#if defined(ARROW_VERSION) && ARROW_VERSION >= 10000000
33
43
using AsyncGeneratorType =
34
44
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>>;
@@ -47,17 +57,21 @@ using AsyncGeneratorType =
47
57
*/
48
58
Result<std::shared_ptr<arrow::Table>> ExecutePlanAndCollectAsTable (
49
59
const arrow::compute::ExecContext& exec_context,
50
- std::shared_ptr<arrow::compute ::ExecPlan> plan,
60
+ std::shared_ptr<arrow_acero_namespace ::ExecPlan> plan,
51
61
std::shared_ptr<arrow::Schema> schema, AsyncGeneratorType sink_gen) {
52
62
// translate sink_gen (async) to sink_reader (sync)
53
63
std::shared_ptr<arrow::RecordBatchReader> sink_reader =
54
- arrow::compute ::MakeGeneratorReader (schema, std::move (sink_gen),
55
- exec_context.memory_pool ());
64
+ arrow_acero_namespace ::MakeGeneratorReader (schema, std::move (sink_gen),
65
+ exec_context.memory_pool ());
56
66
57
67
// validate the ExecPlan
58
68
RETURN_NOT_ARROW_OK (plan->Validate ());
59
69
// start the ExecPlan
70
+ #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
71
+ plan->StartProducing (); // arrow 12.0.0 or later return void, not Status
72
+ #else
60
73
RETURN_NOT_ARROW_OK (plan->StartProducing ());
74
+ #endif
61
75
62
76
// collect sink_reader into a Table
63
77
std::shared_ptr<arrow::Table> response_table;
@@ -643,17 +657,17 @@ Result<std::shared_ptr<arrow::Table>> EdgeChunkWriter::sortTable(
643
657
const std::shared_ptr<arrow::Table>& input_table,
644
658
const std::string& column_name) {
645
659
auto exec_context = arrow::compute::default_exec_context ();
646
- auto plan = arrow::compute ::ExecPlan::Make (exec_context).ValueOrDie ();
660
+ auto plan = arrow_acero_namespace ::ExecPlan::Make (exec_context).ValueOrDie ();
647
661
int max_batch_size = 2 ;
648
- auto table_source_options =
649
- arrow::compute::TableSourceNodeOptions{ input_table, max_batch_size};
650
- auto source = arrow::compute:: MakeExecNode (" table_source" , plan.get (), {} ,
651
- table_source_options)
662
+ auto table_source_options = arrow_acero_namespace::TableSourceNodeOptions{
663
+ input_table, max_batch_size};
664
+ auto source = arrow_acero_namespace:: MakeExecNode (" table_source" , plan.get (),
665
+ {}, table_source_options)
652
666
.ValueOrDie ();
653
667
AsyncGeneratorType sink_gen;
654
- if (!arrow::compute ::MakeExecNode (
668
+ if (!arrow_acero_namespace ::MakeExecNode (
655
669
" order_by_sink" , plan.get (), {source},
656
- arrow::compute ::OrderBySinkNodeOptions{
670
+ arrow_acero_namespace ::OrderBySinkNodeOptions{
657
671
arrow::compute::SortOptions{{arrow::compute::SortKey{
658
672
column_name, arrow::compute::SortOrder::Ascending}}},
659
673
&sink_gen})
0 commit comments