Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17178: [R] Support head() in arrow_dplyr_query with user-defined function #13706

Merged
merged 34 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f9f696d
consolidate exec plan reader into a dedicated reader class
paleolimbot Jul 25, 2022
dbd2d61
fix the head thing
paleolimbot Jul 25, 2022
095b3f0
undo some kludges introduced
paleolimbot Jul 25, 2022
4a32870
explicitly close readers where possible
paleolimbot Jul 25, 2022
24bc1cd
just kidding don't do that
paleolimbot Jul 25, 2022
1542829
just a shot in the dark
paleolimbot Aug 11, 2022
9e9170f
try a new strategy for stopproducing
paleolimbot Aug 11, 2022
30a8684
try a more raw approach to stoppinh production
paleolimbot Aug 15, 2022
8225dc4
always set status
paleolimbot Aug 15, 2022
d69e365
another approach to stopping production
paleolimbot Aug 15, 2022
e860650
more sane but still deadlocking stop_producing
paleolimbot Aug 22, 2022
7a54584
uncomment
paleolimbot Aug 22, 2022
7093723
go back to the other approach
paleolimbot Aug 23, 2022
27143d6
remove unnecessary Close()
paleolimbot Aug 25, 2022
afe61cd
maybe fix python test
paleolimbot Aug 25, 2022
bcd17bc
with ExecPlanReader R6
paleolimbot Aug 26, 2022
2f9a827
undo python workaround
paleolimbot Aug 26, 2022
a5d24a2
with tests to enforce laziness
paleolimbot Aug 26, 2022
9ee0602
document the rbr subclasses
paleolimbot Aug 26, 2022
a915050
exec plan reader head() + tests
paleolimbot Aug 26, 2022
40a99b2
remove collect with udf kludge
paleolimbot Aug 26, 2022
020d338
nix the custom head method
paleolimbot Aug 26, 2022
93764f6
wait for idle thread pool on valgrind job
paleolimbot Aug 29, 2022
0b8ddfa
remove thread pool stuff (it didn't work)
paleolimbot Aug 31, 2022
05f7067
simplify arrow_dplyr_query to table
paleolimbot Aug 31, 2022
b6b9102
add test for lazy head()
paleolimbot Aug 31, 2022
5ec7c1a
don't use R function call recordbatchreader just now
paleolimbot Aug 31, 2022
adc61f6
add a note about lazy stopping but don't test it quite yet
paleolimbot Aug 31, 2022
921296f
Apply suggestions from code review
paleolimbot Sep 12, 2022
4fece2a
make the test less funny and less confusing
paleolimbot Sep 14, 2022
df724b4
attempt to simplify the call to StopProducing()
paleolimbot Sep 14, 2022
1e6d571
comment to remind myself why the complicated callback thing needs to …
paleolimbot Sep 14, 2022
7d8e92f
update for C++17
paleolimbot Sep 15, 2022
92e0416
clang-format
paleolimbot Sep 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions r/R/compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,6 @@ register_scalar_function <- function(name, fun, in_type, out_type,
update_cache = TRUE
)

# User-defined functions require some special handling
# in the query engine which currently require an opt-in using
# the R_ARROW_COLLECT_WITH_UDF environment variable while this
# behaviour is stabilized.
# TODO(ARROW-17178) remove the need for this!
Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true")

invisible(NULL)
}

Expand Down
9 changes: 6 additions & 3 deletions r/R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,19 @@ tail.arrow_dplyr_query <- function(x, n = 6L, ...) {
#' show_exec_plan()
show_exec_plan <- function(x) {
adq <- as_adq(x)
plan <- ExecPlan$create()

# do not show the plan if we have a nested query (as this will force the
# evaluation of the inner query/queries)
# TODO see if we can remove after ARROW-16628
if (is_collapsed(x) && has_head_tail(x$.data)) {
warn("The `ExecPlan` cannot be printed for a nested query.")
return(invisible(x))
}
final_node <- plan$Build(adq)
cat(plan$BuildAndShow(final_node))

result <- as_record_batch_reader(adq)
cat(result$Plan()$ToString())
result$Close()

invisible(x)
}

Expand Down
84 changes: 25 additions & 59 deletions r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,11 @@ ExecPlan <- R6Class("ExecPlan",
}
node
},
Run = function(node, as_table = FALSE) {
# a section of this code is used by `BuildAndShow()` too - the 2 need to be in sync
# Start of chunk used in `BuildAndShow()`
Run = function(node) {
assert_is(node, "ExecNode")

# Sorting and head/tail (if sorted) are handled in the SinkNode,
# created in ExecPlan_run
# created in ExecPlan_build
sorting <- node$extras$sort %||% list()
select_k <- node$extras$head %||% -1L
has_sorting <- length(sorting) > 0
Expand All @@ -214,16 +212,7 @@ ExecPlan <- R6Class("ExecPlan",
sorting$orders <- as.integer(sorting$orders)
}

# End of chunk used in `BuildAndShow()`

# If we are going to return a Table anyway, we do this in one step and
# entirely in one C++ call to ensure that we can execute user-defined
# functions from the worker threads spawned by the ExecPlan. If not, we
# use ExecPlan_run which returns a RecordBatchReader that can be
# manipulated in R code (but that right now won't work with
# user-defined functions).
exec_fun <- if (as_table) ExecPlan_read_table else ExecPlan_run
out <- exec_fun(
out <- ExecPlan_run(
self,
node,
sorting,
Expand All @@ -240,18 +229,13 @@ ExecPlan <- R6Class("ExecPlan",
slice_size <- node$extras$head %||% node$extras$tail
if (!is.null(slice_size)) {
out <- head(out, slice_size)
# We already have everything we need for the head, so StopProducing
self$Stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we want/need this anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the change to head.RecordBatchReader(), no batches get pulled when head(out) is called (which is need to make sure that the ExecPlan can continue to defer StartProducing()). The C++ implementation of head.RecordBatchReader() takes care of calling Close() on the upstream reader (which, if it's an ExecPlanReader, will call StopProducing(). I should add a test to make sure that's the case though!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...the test passes locally and on Windows but fails on all the Linux CI. My guess is that this is related to how the request to stop producing is (or is not) relayed to the source node (or maybe how quickly that happens?).

}
} else if (!is.null(node$extras$tail)) {
# TODO(ARROW-16630): proper BottomK support
# Reverse the row order to get back what we expect
out <- as_arrow_table(out)
out <- out[rev(seq_len(nrow(out))), , drop = FALSE]
# Put back into RBR
if (!as_table) {
out <- as_record_batch_reader(out)
}
out <- as_record_batch_reader(out)
}

# If arrange() created $temp_columns, make sure to omit them from the result
Expand All @@ -261,11 +245,7 @@ ExecPlan <- R6Class("ExecPlan",
if (length(node$extras$sort$temp_columns) > 0) {
tab <- as_arrow_table(out)
tab <- tab[, setdiff(names(tab), node$extras$sort$temp_columns), drop = FALSE]
if (!as_table) {
out <- as_record_batch_reader(tab)
} else {
out <- tab
}
out <- as_record_batch_reader(tab)
}

out
Expand All @@ -279,40 +259,9 @@ ExecPlan <- R6Class("ExecPlan",
...
)
},
# SinkNodes (involved in arrange and/or head/tail operations) are created in
# ExecPlan_run and are not captured by the regulat print method. We take a
# similar approach to expose them before calling the print method.
BuildAndShow = function(node) {
# a section of this code is copied from `Run()` - the 2 need to be in sync
# Start of chunk copied from `Run()`

assert_is(node, "ExecNode")

# Sorting and head/tail (if sorted) are handled in the SinkNode,
# created in ExecPlan_run
sorting <- node$extras$sort %||% list()
select_k <- node$extras$head %||% -1L
has_sorting <- length(sorting) > 0
if (has_sorting) {
if (!is.null(node$extras$tail)) {
# Reverse the sort order and take the top K, then after we'll reverse
# the resulting rows so that it is ordered as expected
sorting$orders <- !sorting$orders
select_k <- node$extras$tail
}
sorting$orders <- as.integer(sorting$orders)
}

# End of chunk copied from `Run()`

ExecPlan_BuildAndShow(
self,
node,
sorting,
select_k
)
},
Stop = function() ExecPlan_StopProducing(self)
ToString = function() {
ExecPlan_ToString(self)
}
)
)
# nolint end.
Expand Down Expand Up @@ -396,6 +345,23 @@ ExecNode <- R6Class("ExecNode",
)
)

ExecPlanReader <- R6Class("ExecPlanReader",
inherit = RecordBatchReader,
public = list(
batches = function() ExecPlanReader__batches(self),
read_table = function() Table__from_ExecPlanReader(self),
Plan = function() ExecPlanReader__Plan(self),
PlanStatus = function() ExecPlanReader__PlanStatus(self),
ToString = function() {
sprintf(
"<Status: %s>\n\n%s\n\nSee $Plan() for details.",
self$PlanStatus(),
super$ToString()
)
}
)
)

do_exec_plan_substrait <- function(substrait_plan) {
if (is.string(substrait_plan)) {
substrait_plan <- substrait__internal__SubstraitFromJSON(substrait_plan)
Expand Down
1 change: 1 addition & 0 deletions r/R/record-batch-reader.R
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ RecordBatchReader <- R6Class("RecordBatchReader",
read_next_batch = function() RecordBatchReader__ReadNext(self),
batches = function() RecordBatchReader__batches(self),
read_table = function() Table__from_RecordBatchReader(self),
Close = function() RecordBatchReader__Close(self),
export_to_c = function(stream_ptr) ExportRecordBatchReader(self, stream_ptr),
ToString = function() self$schema$ToString()
),
Expand Down
12 changes: 1 addition & 11 deletions r/R/table.R
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,5 @@ as_arrow_table.RecordBatchReader <- function(x, ...) {
#' @rdname as_arrow_table
#' @export
as_arrow_table.arrow_dplyr_query <- function(x, ...) {
# See query-engine.R for ExecPlan/Nodes
plan <- ExecPlan$create()
final_node <- plan$Build(x)

run_with_event_loop <- identical(
Sys.getenv("R_ARROW_COLLECT_WITH_UDF", ""),
"true"
)

result <- plan$Run(final_node, as_table = run_with_event_loop)
as_arrow_table(result)
as_arrow_table(as_record_batch_reader(x))
}
78 changes: 49 additions & 29 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions r/src/arrow_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class ExecNode;
} // namespace compute
} // namespace arrow

class ExecPlanReader;

#if defined(ARROW_R_WITH_PARQUET)
#include <parquet/type_fwd.h>
#endif
Expand Down
Loading