diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index ab3358d666496..6e76cd6468783 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -404,24 +404,32 @@ ExecPlan_create <- function(use_threads) { .Call(`_arrow_ExecPlan_create`, use_threads) } -ExecPlan_run <- function(plan, final_node, sort_options, metadata, head) { - .Call(`_arrow_ExecPlan_run`, plan, final_node, sort_options, metadata, head) +ExecPlanReader__batches <- function(reader) { + .Call(`_arrow_ExecPlanReader__batches`, reader) } -ExecPlan_read_table <- function(plan, final_node, sort_options, metadata, head) { - .Call(`_arrow_ExecPlan_read_table`, plan, final_node, sort_options, metadata, head) +Table__from_ExecPlanReader <- function(reader) { + .Call(`_arrow_Table__from_ExecPlanReader`, reader) } -ExecPlan_StopProducing <- function(plan) { - invisible(.Call(`_arrow_ExecPlan_StopProducing`, plan)) +ExecPlanReader__Plan <- function(reader) { + .Call(`_arrow_ExecPlanReader__Plan`, reader) } -ExecNode_output_schema <- function(node) { - .Call(`_arrow_ExecNode_output_schema`, node) +ExecPlanReader__PlanStatus <- function(reader) { + .Call(`_arrow_ExecPlanReader__PlanStatus`, reader) +} + +ExecPlan_run <- function(plan, final_node, sort_options, metadata, head) { + .Call(`_arrow_ExecPlan_run`, plan, final_node, sort_options, metadata, head) } -ExecPlan_BuildAndShow <- function(plan, final_node, sort_options, head) { - .Call(`_arrow_ExecPlan_BuildAndShow`, plan, final_node, sort_options, head) +ExecPlan_ToString <- function(plan) { + .Call(`_arrow_ExecPlan_ToString`, plan) +} + +ExecNode_output_schema <- function(node) { + .Call(`_arrow_ExecNode_output_schema`, node) } ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) { @@ -1728,6 +1736,10 @@ RecordBatchReader__schema <- function(reader) { .Call(`_arrow_RecordBatchReader__schema`, reader) } +RecordBatchReader__Close <- function(reader) { + invisible(.Call(`_arrow_RecordBatchReader__Close`, reader)) +} + RecordBatchReader__ReadNext <- function(reader) { .Call(`_arrow_RecordBatchReader__ReadNext`, reader) } diff --git a/r/R/compute.R b/r/R/compute.R index 636c9146ca37b..a144e7d678a1a 100644 --- a/r/R/compute.R +++ b/r/R/compute.R @@ -385,13 +385,6 @@ register_scalar_function <- function(name, fun, in_type, out_type, update_cache = TRUE ) - # User-defined functions require some special handling - # in the query engine which currently require an opt-in using - # the R_ARROW_COLLECT_WITH_UDF environment variable while this - # behaviour is stabilized. - # TODO(ARROW-17178) remove the need for this! - Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true") - invisible(NULL) } diff --git a/r/R/dplyr.R b/r/R/dplyr.R index dffe269199c86..86132d8ae4aed 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -266,7 +266,7 @@ tail.arrow_dplyr_query <- function(x, n = 6L, ...) { #' show_exec_plan() show_exec_plan <- function(x) { adq <- as_adq(x) - plan <- ExecPlan$create() + # do not show the plan if we have a nested query (as this will force the # evaluation of the inner query/queries) # TODO see if we can remove after ARROW-16628 @@ -274,8 +274,11 @@ show_exec_plan <- function(x) { warn("The `ExecPlan` cannot be printed for a nested query.") return(invisible(x)) } - final_node <- plan$Build(adq) - cat(plan$BuildAndShow(final_node)) + + result <- as_record_batch_reader(adq) + cat(result$Plan()$ToString()) + result$Close() + invisible(x) } diff --git a/r/R/query-engine.R b/r/R/query-engine.R index c132b291b872b..89a8c6a7f3793 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -194,13 +194,11 @@ ExecPlan <- R6Class("ExecPlan", } node }, - Run = function(node, as_table = FALSE) { - # a section of this code is used by `BuildAndShow()` too - the 2 need to be in sync - # Start of chunk used in `BuildAndShow()` + Run = function(node) { assert_is(node, "ExecNode") # Sorting and head/tail (if sorted) are handled in the SinkNode, - # created in ExecPlan_run + # created in ExecPlan_build sorting <- node$extras$sort %||% list() select_k <- node$extras$head %||% -1L has_sorting <- length(sorting) > 0 @@ -214,16 +212,7 @@ ExecPlan <- R6Class("ExecPlan", sorting$orders <- as.integer(sorting$orders) } - # End of chunk used in `BuildAndShow()` - - # If we are going to return a Table anyway, we do this in one step and - # entirely in one C++ call to ensure that we can execute user-defined - # functions from the worker threads spawned by the ExecPlan. If not, we - # use ExecPlan_run which returns a RecordBatchReader that can be - # manipulated in R code (but that right now won't work with - # user-defined functions). - exec_fun <- if (as_table) ExecPlan_read_table else ExecPlan_run - out <- exec_fun( + out <- ExecPlan_run( self, node, sorting, @@ -240,18 +229,13 @@ ExecPlan <- R6Class("ExecPlan", slice_size <- node$extras$head %||% node$extras$tail if (!is.null(slice_size)) { out <- head(out, slice_size) - # We already have everything we need for the head, so StopProducing - self$Stop() } } else if (!is.null(node$extras$tail)) { # TODO(ARROW-16630): proper BottomK support # Reverse the row order to get back what we expect out <- as_arrow_table(out) out <- out[rev(seq_len(nrow(out))), , drop = FALSE] - # Put back into RBR - if (!as_table) { - out <- as_record_batch_reader(out) - } + out <- as_record_batch_reader(out) } # If arrange() created $temp_columns, make sure to omit them from the result @@ -261,11 +245,7 @@ ExecPlan <- R6Class("ExecPlan", if (length(node$extras$sort$temp_columns) > 0) { tab <- as_arrow_table(out) tab <- tab[, setdiff(names(tab), node$extras$sort$temp_columns), drop = FALSE] - if (!as_table) { - out <- as_record_batch_reader(tab) - } else { - out <- tab - } + out <- as_record_batch_reader(tab) } out @@ -279,40 +259,9 @@ ExecPlan <- R6Class("ExecPlan", ... ) }, - # SinkNodes (involved in arrange and/or head/tail operations) are created in - # ExecPlan_run and are not captured by the regulat print method. We take a - # similar approach to expose them before calling the print method. - BuildAndShow = function(node) { - # a section of this code is copied from `Run()` - the 2 need to be in sync - # Start of chunk copied from `Run()` - - assert_is(node, "ExecNode") - - # Sorting and head/tail (if sorted) are handled in the SinkNode, - # created in ExecPlan_run - sorting <- node$extras$sort %||% list() - select_k <- node$extras$head %||% -1L - has_sorting <- length(sorting) > 0 - if (has_sorting) { - if (!is.null(node$extras$tail)) { - # Reverse the sort order and take the top K, then after we'll reverse - # the resulting rows so that it is ordered as expected - sorting$orders <- !sorting$orders - select_k <- node$extras$tail - } - sorting$orders <- as.integer(sorting$orders) - } - - # End of chunk copied from `Run()` - - ExecPlan_BuildAndShow( - self, - node, - sorting, - select_k - ) - }, - Stop = function() ExecPlan_StopProducing(self) + ToString = function() { + ExecPlan_ToString(self) + } ) ) # nolint end. @@ -396,6 +345,23 @@ ExecNode <- R6Class("ExecNode", ) ) +ExecPlanReader <- R6Class("ExecPlanReader", + inherit = RecordBatchReader, + public = list( + batches = function() ExecPlanReader__batches(self), + read_table = function() Table__from_ExecPlanReader(self), + Plan = function() ExecPlanReader__Plan(self), + PlanStatus = function() ExecPlanReader__PlanStatus(self), + ToString = function() { + sprintf( + "\n\n%s\n\nSee $Plan() for details.", + self$PlanStatus(), + super$ToString() + ) + } + ) +) + do_exec_plan_substrait <- function(substrait_plan) { if (is.string(substrait_plan)) { substrait_plan <- substrait__internal__SubstraitFromJSON(substrait_plan) diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R index 3a985d8abceaa..e1dd52ed715a5 100644 --- a/r/R/record-batch-reader.R +++ b/r/R/record-batch-reader.R @@ -98,6 +98,7 @@ RecordBatchReader <- R6Class("RecordBatchReader", read_next_batch = function() RecordBatchReader__ReadNext(self), batches = function() RecordBatchReader__batches(self), read_table = function() Table__from_RecordBatchReader(self), + Close = function() RecordBatchReader__Close(self), export_to_c = function(stream_ptr) ExportRecordBatchReader(self, stream_ptr), ToString = function() self$schema$ToString() ), diff --git a/r/R/table.R b/r/R/table.R index d7e276415c5cd..c529125779254 100644 --- a/r/R/table.R +++ b/r/R/table.R @@ -328,15 +328,5 @@ as_arrow_table.RecordBatchReader <- function(x, ...) { #' @rdname as_arrow_table #' @export as_arrow_table.arrow_dplyr_query <- function(x, ...) { - # See query-engine.R for ExecPlan/Nodes - plan <- ExecPlan$create() - final_node <- plan$Build(x) - - run_with_event_loop <- identical( - Sys.getenv("R_ARROW_COLLECT_WITH_UDF", ""), - "true" - ) - - result <- plan$Run(final_node, as_table = run_with_event_loop) - as_arrow_table(result) + as_arrow_table(as_record_batch_reader(x)) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index adb6636e9ee0c..26ec6e3d9b19f 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -869,36 +869,55 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp -std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head); -extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){ +cpp11::list ExecPlanReader__batches(const std::shared_ptr& reader); +extern "C" SEXP _arrow_ExecPlanReader__batches(SEXP reader_sexp){ BEGIN_CPP11 - arrow::r::Input&>::type plan(plan_sexp); - arrow::r::Input&>::type final_node(final_node_sexp); - arrow::r::Input::type sort_options(sort_options_sexp); - arrow::r::Input::type metadata(metadata_sexp); - arrow::r::Input::type head(head_sexp); - return cpp11::as_sexp(ExecPlan_run(plan, final_node, sort_options, metadata, head)); + arrow::r::Input&>::type reader(reader_sexp); + return cpp11::as_sexp(ExecPlanReader__batches(reader)); +END_CPP11 +} +// compute-exec.cpp +std::shared_ptr Table__from_ExecPlanReader(const std::shared_ptr& reader); +extern "C" SEXP _arrow_Table__from_ExecPlanReader(SEXP reader_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type reader(reader_sexp); + return cpp11::as_sexp(Table__from_ExecPlanReader(reader)); +END_CPP11 +} +// compute-exec.cpp +std::shared_ptr ExecPlanReader__Plan(const std::shared_ptr& reader); +extern "C" SEXP _arrow_ExecPlanReader__Plan(SEXP reader_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type reader(reader_sexp); + return cpp11::as_sexp(ExecPlanReader__Plan(reader)); +END_CPP11 +} +// compute-exec.cpp +std::string ExecPlanReader__PlanStatus(const std::shared_ptr& reader); +extern "C" SEXP _arrow_ExecPlanReader__PlanStatus(SEXP reader_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type reader(reader_sexp); + return cpp11::as_sexp(ExecPlanReader__PlanStatus(reader)); END_CPP11 } // compute-exec.cpp -std::shared_ptr ExecPlan_read_table(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head); -extern "C" SEXP _arrow_ExecPlan_read_table(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){ +std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head); +extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP metadata_sexp, SEXP head_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); arrow::r::Input&>::type final_node(final_node_sexp); arrow::r::Input::type sort_options(sort_options_sexp); arrow::r::Input::type metadata(metadata_sexp); arrow::r::Input::type head(head_sexp); - return cpp11::as_sexp(ExecPlan_read_table(plan, final_node, sort_options, metadata, head)); + return cpp11::as_sexp(ExecPlan_run(plan, final_node, sort_options, metadata, head)); END_CPP11 } // compute-exec.cpp -void ExecPlan_StopProducing(const std::shared_ptr& plan); -extern "C" SEXP _arrow_ExecPlan_StopProducing(SEXP plan_sexp){ +std::string ExecPlan_ToString(const std::shared_ptr& plan); +extern "C" SEXP _arrow_ExecPlan_ToString(SEXP plan_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); - ExecPlan_StopProducing(plan); - return R_NilValue; + return cpp11::as_sexp(ExecPlan_ToString(plan)); END_CPP11 } // compute-exec.cpp @@ -910,17 +929,6 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp -std::string ExecPlan_BuildAndShow(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, int64_t head); -extern "C" SEXP _arrow_ExecPlan_BuildAndShow(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP head_sexp){ -BEGIN_CPP11 - arrow::r::Input&>::type plan(plan_sexp); - arrow::r::Input&>::type final_node(final_node_sexp); - arrow::r::Input::type sort_options(sort_options_sexp); - arrow::r::Input::type head(head_sexp); - return cpp11::as_sexp(ExecPlan_BuildAndShow(plan, final_node, sort_options, head)); -END_CPP11 -} -// compute-exec.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr ExecNode_Scan(const std::shared_ptr& plan, const std::shared_ptr& dataset, const std::shared_ptr& filter, std::vector materialized_field_names); extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP materialized_field_names_sexp){ @@ -4466,6 +4474,15 @@ BEGIN_CPP11 END_CPP11 } // recordbatchreader.cpp +void RecordBatchReader__Close(const std::shared_ptr& reader); +extern "C" SEXP _arrow_RecordBatchReader__Close(SEXP reader_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type reader(reader_sexp); + RecordBatchReader__Close(reader); + return R_NilValue; +END_CPP11 +} +// recordbatchreader.cpp std::shared_ptr RecordBatchReader__ReadNext(const std::shared_ptr& reader); extern "C" SEXP _arrow_RecordBatchReader__ReadNext(SEXP reader_sexp){ BEGIN_CPP11 @@ -5286,11 +5303,13 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_io___CompressedOutputStream__Make", (DL_FUNC) &_arrow_io___CompressedOutputStream__Make, 2}, { "_arrow_io___CompressedInputStream__Make", (DL_FUNC) &_arrow_io___CompressedInputStream__Make, 2}, { "_arrow_ExecPlan_create", (DL_FUNC) &_arrow_ExecPlan_create, 1}, + { "_arrow_ExecPlanReader__batches", (DL_FUNC) &_arrow_ExecPlanReader__batches, 1}, + { "_arrow_Table__from_ExecPlanReader", (DL_FUNC) &_arrow_Table__from_ExecPlanReader, 1}, + { "_arrow_ExecPlanReader__Plan", (DL_FUNC) &_arrow_ExecPlanReader__Plan, 1}, + { "_arrow_ExecPlanReader__PlanStatus", (DL_FUNC) &_arrow_ExecPlanReader__PlanStatus, 1}, { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 5}, - { "_arrow_ExecPlan_read_table", (DL_FUNC) &_arrow_ExecPlan_read_table, 5}, - { "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1}, + { "_arrow_ExecPlan_ToString", (DL_FUNC) &_arrow_ExecPlan_ToString, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, - { "_arrow_ExecPlan_BuildAndShow", (DL_FUNC) &_arrow_ExecPlan_BuildAndShow, 4}, { "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14}, { "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 2}, @@ -5617,6 +5636,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_RecordBatch__from_arrays", (DL_FUNC) &_arrow_RecordBatch__from_arrays, 2}, { "_arrow_RecordBatch__ReferencedBufferSize", (DL_FUNC) &_arrow_RecordBatch__ReferencedBufferSize, 1}, { "_arrow_RecordBatchReader__schema", (DL_FUNC) &_arrow_RecordBatchReader__schema, 1}, + { "_arrow_RecordBatchReader__Close", (DL_FUNC) &_arrow_RecordBatchReader__Close, 1}, { "_arrow_RecordBatchReader__ReadNext", (DL_FUNC) &_arrow_RecordBatchReader__ReadNext, 1}, { "_arrow_RecordBatchReader__batches", (DL_FUNC) &_arrow_RecordBatchReader__batches, 1}, { "_arrow_RecordBatchReader__from_batches", (DL_FUNC) &_arrow_RecordBatchReader__from_batches, 2}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index d9fee37e7f138..dd0dc24449efa 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -58,6 +58,8 @@ class ExecNode; } // namespace compute } // namespace arrow +class ExecPlanReader; + #if defined(ARROW_R_WITH_PARQUET) #include #endif diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index abcb418a2c237..71dc6d8b2e193 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -56,118 +56,156 @@ std::shared_ptr MakeExecNodeOrStop( }); } -std::pair, std::shared_ptr> -ExecPlan_prepare(const std::shared_ptr& plan, - const std::shared_ptr& final_node, - cpp11::list sort_options, cpp11::strings metadata, int64_t head = -1) { - // a section of this code is copied and used in ExecPlan_BuildAndShow - the 2 need - // to be in sync - // Start of chunk used in ExecPlan_BuildAndShow +// This class is a special RecordBatchReader that holds a reference to the +// underlying exec plan so that (1) it can request that the ExecPlan *stop* +// producing when this object is deleted and (2) it can defer requesting +// the ExecPlan to *start* producing until the first batch has been pulled. +// This allows it to be transformed (e.g., using map_batches() or head()) +// and queried (i.e., used as input to another ExecPlan), at the R level +// while maintaining the ability for the entire plan to be executed at once +// (e.g., to support user-defined functions) or never executed at all (e.g., +// to support printing a nested ExecPlan without having to execute it). +class ExecPlanReader : public arrow::RecordBatchReader { + public: + enum ExecPlanReaderStatus { PLAN_NOT_STARTED, PLAN_RUNNING, PLAN_FINISHED }; + + ExecPlanReader(const std::shared_ptr& plan, + const std::shared_ptr& schema, + arrow::AsyncGenerator> sink_gen) + : schema_(schema), plan_(plan), sink_gen_(sink_gen), status_(PLAN_NOT_STARTED) {} + + std::string PlanStatus() const { + switch (status_) { + case PLAN_NOT_STARTED: + return "PLAN_NOT_STARTED"; + case PLAN_RUNNING: + return "PLAN_RUNNING"; + case PLAN_FINISHED: + return "PLAN_FINISHED"; + default: + return "UNKNOWN"; + } + } - // For now, don't require R to construct SinkNodes. - // Instead, just pass the node we should collect as an argument. - arrow::AsyncGenerator> sink_gen; + std::shared_ptr schema() const override { return schema_; } - // Sorting uses a different sink node; there is no general sort yet - if (sort_options.size() > 0) { - if (head >= 0) { - // Use the SelectK node to take only what we need - MakeExecNodeOrStop( - "select_k_sink", plan.get(), {final_node.get()}, - compute::SelectKSinkNodeOptions{ - arrow::compute::SelectKOptions( - head, std::dynamic_pointer_cast( - make_compute_options("sort_indices", sort_options)) - ->sort_keys), - &sink_gen}); + arrow::Status ReadNext(std::shared_ptr* batch_out) override { + // TODO(ARROW-11841) check a StopToken to potentially cancel this plan + + // If this is the first batch getting pulled, tell the exec plan to + // start producing + if (status_ == PLAN_NOT_STARTED) { + ARROW_RETURN_NOT_OK(StartProducing()); + } + + // If we've closed the reader, keep sending nullptr + // (consistent with what most RecordBatchReader subclasses do) + if (status_ == PLAN_FINISHED) { + batch_out->reset(); + return arrow::Status::OK(); + } + + auto out = sink_gen_().result(); + if (!out.ok()) { + StopProducing(); + return out.status(); + } + + if (out.ValueUnsafe()) { + auto batch_result = out.ValueUnsafe()->ToRecordBatch(schema_, gc_memory_pool()); + if (!batch_result.ok()) { + StopProducing(); + return batch_result.status(); + } + + *batch_out = batch_result.ValueUnsafe(); } else { - MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()}, - compute::OrderBySinkNodeOptions{ - *std::dynamic_pointer_cast( - make_compute_options("sort_indices", sort_options)), - &sink_gen}); + batch_out->reset(); + StopProducing(); } - } else { - MakeExecNodeOrStop("sink", plan.get(), {final_node.get()}, - compute::SinkNodeOptions{&sink_gen}); + + return arrow::Status::OK(); } - // End of chunk used in ExecPlan_BuildAndShow + arrow::Status Close() override { + StopProducing(); + return arrow::Status::OK(); + } - StopIfNotOk(plan->Validate()); + const std::shared_ptr& Plan() const { return plan_; } - // If the generator is destroyed before being completely drained, inform plan - std::shared_ptr stop_producing{nullptr, [plan](...) { - bool not_finished_yet = - plan->finished().TryAddCallback([&plan] { - return [plan](const arrow::Status&) {}; - }); + ~ExecPlanReader() { StopProducing(); } - if (not_finished_yet) { - plan->StopProducing(); - } - }}; + private: + std::shared_ptr schema_; + std::shared_ptr plan_; + arrow::AsyncGenerator> sink_gen_; + int status_; - // Attach metadata to the schema - auto out_schema = final_node->output_schema(); - if (metadata.size() > 0) { - auto kv = strings_to_kvm(metadata); - out_schema = out_schema->WithMetadata(kv); + arrow::Status StartProducing() { + ARROW_RETURN_NOT_OK(plan_->StartProducing()); + status_ = PLAN_RUNNING; + return arrow::Status::OK(); } - std::pair, std::shared_ptr> - out; - out.first = plan; - out.second = compute::MakeGeneratorReader( - out_schema, [stop_producing, plan, sink_gen] { return sink_gen(); }, - gc_memory_pool()); - return out; -} + void StopProducing() { + if (status_ == PLAN_RUNNING) { + // We're done with the plan, but it may still need some time + // to finish and clean up after itself. To do this, we give a + // callable with its own copy of the shared_ptr so + // that it can delete itself when it is safe to do so. + std::shared_ptr plan(plan_); + bool not_finished_yet = plan_->finished().TryAddCallback( + [&plan] { return [plan](const arrow::Status&) {}; }); + + if (not_finished_yet) { + plan_->StopProducing(); + } + } + + status_ = PLAN_FINISHED; + plan_.reset(); + sink_gen_ = arrow::MakeEmptyGenerator>(); + } +}; // [[arrow::export]] -std::shared_ptr ExecPlan_run( - const std::shared_ptr& plan, - const std::shared_ptr& final_node, cpp11::list sort_options, - cpp11::strings metadata, int64_t head = -1) { - auto prepared_plan = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); - StopIfNotOk(prepared_plan.first->StartProducing()); - return prepared_plan.second; +cpp11::list ExecPlanReader__batches( + const std::shared_ptr& reader) { + auto result = RunWithCapturedRIfPossible( + [&]() { return reader->ToRecordBatches(); }); + return arrow::r::to_r_list(ValueOrStop(result)); } // [[arrow::export]] -std::shared_ptr ExecPlan_read_table( - const std::shared_ptr& plan, - const std::shared_ptr& final_node, cpp11::list sort_options, - cpp11::strings metadata, int64_t head = -1) { - auto prepared_plan = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); - +std::shared_ptr Table__from_ExecPlanReader( + const std::shared_ptr& reader) { auto result = RunWithCapturedRIfPossible>( - [&]() -> arrow::Result> { - ARROW_RETURN_NOT_OK(prepared_plan.first->StartProducing()); - return prepared_plan.second->ToTable(); - }); + [&]() { return reader->ToTable(); }); return ValueOrStop(result); } // [[arrow::export]] -void ExecPlan_StopProducing(const std::shared_ptr& plan) { - plan->StopProducing(); +std::shared_ptr ExecPlanReader__Plan( + const std::shared_ptr& reader) { + if (reader->PlanStatus() == "PLAN_FINISHED") { + cpp11::stop("Can't extract ExecPlan from a finished ExecPlanReader"); + } + + return reader->Plan(); } // [[arrow::export]] -std::shared_ptr ExecNode_output_schema( - const std::shared_ptr& node) { - return node->output_schema(); +std::string ExecPlanReader__PlanStatus(const std::shared_ptr& reader) { + return reader->PlanStatus(); } // [[arrow::export]] -std::string ExecPlan_BuildAndShow(const std::shared_ptr& plan, - const std::shared_ptr& final_node, - cpp11::list sort_options, int64_t head = -1) { - // a section of this code is copied from ExecPlan_prepare - the 2 need to be in sync - // Start of chunk copied from ExecPlan_prepare - +std::shared_ptr ExecPlan_run( + const std::shared_ptr& plan, + const std::shared_ptr& final_node, cpp11::list sort_options, + cpp11::strings metadata, int64_t head = -1) { // For now, don't require R to construct SinkNodes. // Instead, just pass the node we should collect as an argument. arrow::AsyncGenerator> sink_gen; @@ -196,11 +234,29 @@ std::string ExecPlan_BuildAndShow(const std::shared_ptr& plan compute::SinkNodeOptions{&sink_gen}); } - // End of chunk copied from ExecPlan_prepare + StopIfNotOk(plan->Validate()); + + // Attach metadata to the schema + auto out_schema = final_node->output_schema(); + if (metadata.size() > 0) { + auto kv = strings_to_kvm(metadata); + out_schema = out_schema->WithMetadata(kv); + } + + return std::make_shared(plan, out_schema, sink_gen); +} +// [[arrow::export]] +std::string ExecPlan_ToString(const std::shared_ptr& plan) { return plan->ToString(); } +// [[arrow::export]] +std::shared_ptr ExecNode_output_schema( + const std::shared_ptr& node) { + return node->output_schema(); +} + #if defined(ARROW_R_WITH_DATASET) #include diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index c571d282da186..d0c52acc41695 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -27,6 +27,11 @@ std::shared_ptr RecordBatchReader__schema( return reader->schema(); } +// [[arrow::export]] +void RecordBatchReader__Close(const std::shared_ptr& reader) { + return arrow::StopIfNotOk(reader->Close()); +} + // [[arrow::export]] std::shared_ptr RecordBatchReader__ReadNext( const std::shared_ptr& reader) { @@ -111,19 +116,77 @@ std::shared_ptr Table__from_RecordBatchReader( return ValueOrStop(reader->ToTable()); } +// Because the head() operation can leave a RecordBatchReader whose contents +// will never be drained, we implement a wrapper class here that takes care +// to (1) return only the requested number of rows (or fewer) and (2) Close +// and release the underlying reader as soon as possible. This is mostly +// useful for the ExecPlanReader, whose Close() method also requests +// that the ExecPlan stop producing, but may also be useful for readers +// that point to an open file and whose Close() or delete method releases +// the file. +class RecordBatchReaderHead : public arrow::RecordBatchReader { + public: + RecordBatchReaderHead(std::shared_ptr reader, + int64_t num_rows) + : schema_(reader->schema()), reader_(reader), num_rows_(num_rows) {} + + std::shared_ptr schema() const override { return schema_; } + + arrow::Status ReadNext(std::shared_ptr* batch_out) override { + if (!reader_) { + // Close() has been called + batch_out = nullptr; + return arrow::Status::OK(); + } + + ARROW_RETURN_NOT_OK(reader_->ReadNext(batch_out)); + if (batch_out->get()) { + num_rows_ -= batch_out->get()->num_rows(); + if (num_rows_ < 0) { + auto smaller_batch = + batch_out->get()->Slice(0, batch_out->get()->num_rows() + num_rows_); + *batch_out = smaller_batch; + } + + if (num_rows_ <= 0) { + // We've run out of num_rows before batches + ARROW_RETURN_NOT_OK(Close()); + } + } else { + // We've run out of batches before num_rows + ARROW_RETURN_NOT_OK(Close()); + } + + return arrow::Status::OK(); + } + + arrow::Status Close() override { + if (reader_) { + arrow::Status result = reader_->Close(); + reader_.reset(); + return result; + } else { + return arrow::Status::OK(); + } + } + + private: + std::shared_ptr schema_; + std::shared_ptr reader_; + int64_t num_rows_; +}; + // [[arrow::export]] std::shared_ptr RecordBatchReader__Head( const std::shared_ptr& reader, int64_t num_rows) { - std::vector> batches; - std::shared_ptr this_batch; - while (num_rows > 0) { - this_batch = ValueOrStop(reader->Next()); - if (this_batch == nullptr) break; - batches.push_back(this_batch->Slice(0, num_rows)); - num_rows -= this_batch->num_rows(); + if (num_rows <= 0) { + // If we are never going to pull any batches from this reader, close it + // immediately. + StopIfNotOk(reader->Close()); + return ValueOrStop(arrow::RecordBatchReader::Make({}, reader->schema())); + } else { + return std::make_shared(reader, num_rows); } - return ValueOrStop( - arrow::RecordBatchReader::Make(std::move(batches), reader->schema())); } // -------- RecordBatchStreamReader diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index 5821c0fa2df1c..11c37519ae5eb 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -91,11 +91,7 @@ test_that("register_scalar_function() adds a compute function to the registry", int32(), float64(), auto_convert = TRUE ) - on.exit({ - unregister_binding("times_32", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32", update_cache = TRUE)) expect_true("times_32" %in% names(asNamespace("arrow")$.cache$functions)) expect_true("times_32" %in% list_compute_functions()) @@ -127,11 +123,7 @@ test_that("arrow_scalar_function() with bad return type errors", { int32(), float64() ) - on.exit({ - unregister_binding("times_32_bad_return_type_array", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32_bad_return_type_array", update_cache = TRUE)) expect_error( call_function("times_32_bad_return_type_array", Array$create(1L)), @@ -144,11 +136,7 @@ test_that("arrow_scalar_function() with bad return type errors", { int32(), float64() ) - on.exit({ - unregister_binding("times_32_bad_return_type_scalar", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32_bad_return_type_scalar", update_cache = TRUE)) expect_error( call_function("times_32_bad_return_type_scalar", Array$create(1L)), @@ -166,11 +154,7 @@ test_that("register_scalar_function() can register multiple kernels", { out_type = function(in_types) in_types[[1]], auto_convert = TRUE ) - on.exit({ - unregister_binding("times_32", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32", update_cache = TRUE)) expect_equal( call_function("times_32", Scalar$create(1L, int32())), @@ -189,9 +173,6 @@ test_that("register_scalar_function() can register multiple kernels", { }) test_that("register_scalar_function() errors for unsupported specifications", { - # TODO(ARROW-17178) remove the need for this! - on.exit(Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")) - expect_error( register_scalar_function( "no_kernels", @@ -256,11 +237,7 @@ test_that("user-defined functions work during multi-threaded execution", { float64(), auto_convert = TRUE ) - on.exit({ - unregister_binding("times_32", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32", update_cache = TRUE)) # check a regular collect() result <- open_dataset(tf_dataset) %>% @@ -282,7 +259,7 @@ test_that("user-defined functions work during multi-threaded execution", { expect_identical(result2$fun_result, example_df$value * 32) }) -test_that("user-defined error when called from an unsupported context", { +test_that("nested exec plans can contain user-defined functions", { skip_if_not_available("dataset") skip_if_not(CanRunWithCapturedR()) @@ -293,11 +270,7 @@ test_that("user-defined error when called from an unsupported context", { float64(), auto_convert = TRUE ) - on.exit({ - unregister_binding("times_32", update_cache = TRUE) - # TODO(ARROW-17178) remove the need for this! - Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF") - }) + on.exit(unregister_binding("times_32", update_cache = TRUE)) stream_plan_with_udf <- function() { record_batch(a = 1:1000) %>% @@ -313,24 +286,35 @@ test_that("user-defined error when called from an unsupported context", { dplyr::collect() } - if (identical(tolower(Sys.info()[["sysname"]]), "windows")) { - expect_equal( - stream_plan_with_udf(), - record_batch(a = 1:1000) %>% - dplyr::mutate(b = times_32(a)) %>% - dplyr::collect(as_data_frame = FALSE) - ) - - result <- collect_plan_with_head() - expect_equal(nrow(result), 11) - } else { - expect_error( - stream_plan_with_udf(), - "Call to R \\(.*?\\) from a non-R thread from an unsupported context" - ) - expect_error( - collect_plan_with_head(), - "Call to R \\(.*?\\) from a non-R thread from an unsupported context" - ) - } + expect_equal( + stream_plan_with_udf(), + record_batch(a = 1:1000) %>% + dplyr::mutate(b = times_32(a)) %>% + dplyr::collect(as_data_frame = FALSE) + ) + + result <- collect_plan_with_head() + expect_equal(nrow(result), 11) +}) + +test_that("head() on exec plan containing user-defined functions", { + skip_if_not_available("dataset") + skip_if_not(CanRunWithCapturedR()) + + register_scalar_function( + "times_32", + function(context, x) x * 32.0, + int32(), + float64(), + auto_convert = TRUE + ) + on.exit(unregister_binding("times_32", update_cache = TRUE)) + + result <- record_batch(a = 1:1000) %>% + dplyr::mutate(b = times_32(a)) %>% + as_record_batch_reader() %>% + head(11) %>% + dplyr::collect() + + expect_equal(nrow(result), 11) }) diff --git a/r/tests/testthat/test-query-engine.R b/r/tests/testthat/test-query-engine.R index dd87335f876d6..f2190eb668400 100644 --- a/r/tests/testthat/test-query-engine.R +++ b/r/tests/testthat/test-query-engine.R @@ -17,6 +17,66 @@ library(dplyr, warn.conflicts = FALSE) +test_that("ExecPlanReader does not start evaluating a query", { + rbr <- as_record_batch_reader( + function(x) stop("This query will error if started"), + schema = schema(a = int32()) + ) + + reader <- as_record_batch_reader(as_adq(rbr)) + expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") + expect_error(reader$read_table(), "This query will error if started") + expect_identical(reader$PlanStatus(), "PLAN_FINISHED") +}) + +test_that("ExecPlanReader evaluates nested exec plans lazily", { + reader <- as_record_batch_reader(as_adq(arrow_table(a = 1:10))) + expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") + + head_reader <- head(reader, 4) + expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") + + expect_equal( + head_reader$read_table(), + arrow_table(a = 1:4) + ) + + expect_identical(reader$PlanStatus(), "PLAN_FINISHED") +}) + +test_that("ExecPlanReader evaluates head() lazily", { + reader <- as_record_batch_reader(as_adq(arrow_table(a = 1:10))) + expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") + + head_reader <- head(reader, 4) + expect_identical(reader$PlanStatus(), "PLAN_NOT_STARTED") + + expect_equal( + head_reader$read_table(), + arrow_table(a = 1:4) + ) + + expect_identical(reader$PlanStatus(), "PLAN_FINISHED") +}) + +test_that("ExecPlanReader evaluates head() lazily", { + # Make a rather long RecordBatchReader + reader <- RecordBatchReader$create( + batches = rep( + list(record_batch(line = letters)), + 100L + ) + ) + + # ...But only get 10 rows from it + query <- head(as_adq(reader), 10) + expect_identical(as_arrow_table(query)$num_rows, 10L) + + # Depending on exactly how quickly background threads respond to the + # request to cancel, reader$read_table()$num_rows > 0 may or may not + # evaluate to TRUE (i.e., the reader may or may not be completely drained). +}) + test_that("do_exec_plan_substrait can evaluate a simple plan", { skip_if_not_available("substrait")