Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-11841: [R][C++] Allow cancelling long-running commands #13635

Merged
merged 39 commits into from
Oct 3, 2022

Conversation

paleolimbot
Copy link
Member

No description provided.

@github-actions
Copy link

@github-actions
Copy link

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@paleolimbot
Copy link
Member Author

I don't think this works yet, or at least it doesn't reduce the amount of time it takes to read a big CSV. I'm using this as my reprex to test. I don't understand a lot about the content of this PR so feel free to change everything about it!

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.

tf <- tempfile()
readr::write_csv(vctrs::vec_rep(mtcars, 5e5), tf)

# try to slow down CSV reading
set_cpu_count(1)
set_io_thread_count(2)

# hit Control-C while this line runs!
system.time(read_csv_arrow(tf))
#>    user  system elapsed 
#>   2.785   0.337   3.220

Created on 2022-07-18 by the reprex package (v2.0.1)

I think the overriding calling handler is being called, or at least I can do stuff there that causes R to crash (but breakpoints and printing don't seem to work here, maybe because of the interrupt?).

@paleolimbot
Copy link
Member Author

@pitrou Could you take a look at the signal handling code to see there's anything you can see that's incorrect? It doesn't work but I'm not sure if it's because I'm calling the Arrow API incorrectly or because I'm not registering the signal handler correctly.

It looks like the stop source cancellation already works on Windows? https://github.com/apache/arrow/runs/7396189436?check_suite_focus=true#step:11:49243 . Does that ring any bells?

@paleolimbot
Copy link
Member Author

I'm a little concerned about the prospect of messing with signal handlers in RunWithCapturedR, right before a release, since this runs during pretty much all of the most used functions in Arrow (read/write CSV, feather, and query engine after the user-defined functions PR merges).

Is there any chance I can convince you to allow registering a (thread-safe) callback? I would love to be able to RegisterThreadSafeUserInterruptChecker([]() { SafeCallIntoR([]() { /* return true or false */}); }) (but I understand if this is impossible or a bad idea based on how this is implemented in Arrow C++.

@pitrou
Copy link
Member

pitrou commented Jul 20, 2022

This is actually a TODO: https://issues.apache.org/jira/browse/ARROW-12938

What is RegisterThreadSafeUserInterruptChecker?

@pitrou
Copy link
Member

pitrou commented Jul 20, 2022

Also, regardless, some signal handler must be used to detect user interruption. Otherwise, how do you plan to do it?

@paleolimbot
Copy link
Member Author

RegisterThreadSafeUserInterruptChecker() is my made up name for however one registers the callback that is forwarded from whatever is checking the stop token. It sounds like this is sightly different than the JIRA, which is forwarding stop requests (I was hoping to forward the call from whatever checking for stop requests).

As I understand it, R already has what is basically a global stop token implemented via a signal handler; however, there is no thread-safe way to query it. Because we have SafeCallIntoR(), we can query it from any thread (but I don't think there's an API to insert that callable into Arrow).

I'm sure temporarily overriding R's signal handler is fine too, I just don't have confidence in my ability to do that safely before the release.

@pitrou
Copy link
Member

pitrou commented Jul 20, 2022

Ah, well, we can defer after 9.0.0 then :-)

@pitrou
Copy link
Member

pitrou commented Jul 21, 2022

The new code looks much more reasonable. Does it work? :-)

@paleolimbot
Copy link
Member Author

I appreciate your patience with anything I said earlier...the degree to which I misunderstood this API is difficult to describe (in particular, I didn't get that a new StopToken had to get passed to each cancellable operation). I'm happy to say it works! I would prefer to be able to subclass a StopToken and provide an R-specific implementation, but that's a separate discussion.

I had to use the StopSource::Reset() method that is marked internal (or else cancelling one thing results in many subsequent operations marked as cancelled). I initially worked around that by using R's equivalent of a context manager to set and reset the stop source pointer, but that's difficult to program around for us because what does and does not go within with_stop_source(some_code) we can't always control.

I asked in the r-lib channel about the safety of overriding signal handlers and they seemed to indicate that doing it temporarily is OK as long as the R API isn't called. Hence, I only enable the signal handlers when we're about to launch a worker thread to do "arrow stuff" and disable the signal handlers when SafeCallIntoR() is invoked. This is the part that I'm worried will break.

@pitrou
Copy link
Member

pitrou commented Jul 21, 2022

I had to use the StopSource::Reset() method that is marked internal (or else cancelling one thing results in many subsequent operations marked as cancelled).

Hmm, I'm curious, what do you mean with "subsequent operations"? Are they part of the same overall cancellable call?

r/src/filesystem.cpp Outdated Show resolved Hide resolved
r/_pkgdown.yml Outdated Show resolved Hide resolved
r/src/safe-call-into-r.h Outdated Show resolved Hide resolved

// Call this method from the R thread (e.g., on package load)
// to save an internal copy of the thread id.
void Initialize() {
thread_id_ = std::this_thread::get_id();
initialized_ = true;
SetError(R_NilValue);
ResetError();
arrow::ResetSignalStopSource();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this doesn't seem right (though perhaps it works by chance). As the C++ docstring says:

/// The only allowed order of calls is the following:
/// - SetSignalStopSource()
/// - any number of pairs of (RegisterCancellingSignalHandler,
///   UnregisterCancellingSignalHandler) calls
/// - ResetSignalStopSource()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason that one is there is because if you reload the dll (e.g., devtools::load_all()), you end up with an error (stop source already set up, or something). Most of the R developers do devtools::load_all() many times per commit, so we need something that works with that workflow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I don't horrify you with at least one thing that R developers do per day then I feel I haven't done my job (I'm pretty sure there's a DLL unload hook though, I just have to remember what it is...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:-D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I found it (and made the error on package load silent since in theory somebody could load the R package after loading the Python package in a conda environment or something which would have the same problem of setting the signal stop source twice).

ResetError();
throw e;
if (SignalStopSourceEnabled()) {
stop_source_->Reset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be needed. Instead, ResetSignalStopSource will trigger creation of a new stop source the next time SetSignalStopSource is called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's my reading that ResetSignalStopSource() will also invalidate the stop_source_ pointer?

If I do what the docs say:

  • I call read_csv_arrow() and cancel it
  • Every subsequent call to read_csv_arrow() fails with "Cancelled: received signal 2" (I forget the exact message) even if I don't cancel it.

The context manager workaround (where the stop_source_ pointer is managed by something like with_stop_source(some_code)) is not practical...there are many workflows where a user creates a filesystem (e.g., s3_bucket()) and re-uses it between things that might be cancellable (likeread_csv_arrow(bucket$path("some/file.csv")). It's my reading that the stop token would be assigned when the filesystem is created.

Copy link
Member

@pitrou pitrou Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, the StopToken is persisted on the IOContext... hmm, that's a more general usability issue that I hadn't though about :-/

Can you perhaps open a JIRA about this problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will. In the meantime, is the usage of Reset() bad enough that this PR needs to be put on hold until it resolved, or it is an acceptable alternative until something better is available? (I get that it may create interactions with cancelled operations in Python in the rare but not unheard of case of R and Python linking to the same .so).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's bad here. That said, we probably agree that this PR is for 10.0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I'm no longer concerned about it now that I'm not responsible for the signal handling code, it certainly would benefit from interactive use since it's hard to test sending an interrupt signal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere in the middle, we could allow users to opt-in to using it (arrow::enable_stop_source()).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, an opt-in setting sounds nice. And ask the R Voltron folks to play with it perhaps :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opt-in complete; JIRA is here: ARROW-17173

r/src/safe-call-into-r.h Outdated Show resolved Hide resolved
r/src/safe-call-into-r.h Outdated Show resolved Hide resolved
@paleolimbot paleolimbot marked this pull request as ready for review July 21, 2022 18:13
@paleolimbot
Copy link
Member Author

I did some more testing, and the only thing this works for is CSV reading (probably because the CSV reader takes its own StopToken and apparently doesn't care about the IOContext's. In fact, I can't find anywhere that the IOContext's StopToken actually works (for example, collecting a dataset). I attempted a few combinations of inserting the StopToken into some query engine things in compute-exec.cpp, none of which actually resulted in the ability to stop a query plan.

We could either merge this because it contains some useful infrastructure and will make follow-up PRs have a smaller scope or wait until there's a clearer path to everything getting cancelled properly (happy with either).

Reprex that I'm working with:

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.

tf <- tempfile()
if (!file.exists(tf)) readr::write_csv(vctrs::vec_rep(mtcars, 7e5), tf)

# try to slow down CSV reading
set_cpu_count(1)
set_io_thread_count(2)

# turn on the stop source thing
enable_cancel_from_interrupt()

# Control-C during this works
read_csv_arrow(tf, as_data_frame = FALSE)
#> Table
#> 22400000 rows x 11 columns
#> $mpg <double>
#> $cyl <int64>
#> $disp <double>
#> $hp <int64>
#> $drat <double>
#> $wt <double>
#> $qsec <double>
#> $vs <int64>
#> $am <int64>
#> $gear <int64>
#> $carb <int64>

# Control-C during this doesn't
system.time(open_dataset(tf, format = "csv") |> dplyr::collect(as_data_frame = FALSE))
#>    user  system elapsed 
#>   3.900   0.259   4.072

Created on 2022-07-22 by the reprex package (v2.0.1)

@pitrou
Copy link
Member

pitrou commented Jul 22, 2022

The StopToken will only work if it is actually polled by the implementation. I'm not sure the dataset layer does that. @westonpace

Also, the CSV does use the IOContext's StopToken:

Result<std::shared_ptr<Table>> Read() override {
task_group_ = TaskGroup::MakeSerial(io_context_.stop_token());

@westonpace
Copy link
Member

The StopToken will only work if it is actually polled by the implementation. I'm not sure the dataset layer does that. @westonpace

It does not. The exec plan has its own cancellation method (StopProducing). However, the scan node will need to connect StopProducing to the actual readers. So I think this will result in the exec plan creating its own stop source which it uses to generate stop tokens for the file readers. Then, when the user calls StopProducing, the exec plan will cancel the source.

Potentially we could adjust the exec plan to also have a stop source, in a "query options" object or something like that, and then we could get rid of StopProducing (at least externally). That would give a slightly more consistent overall API.

@paleolimbot
Copy link
Member Author

Just a note that I'm planning on picking this back up after #13706 is merged, since that adds enough infrastructure to our ExecPlan wrapping to get basic cancellation of those.

paleolimbot added a commit that referenced this pull request Sep 16, 2022
…d function (#13706)

This PR adds support for more types of queries that include calls to R code (i.e., `map_batches(..., .lazy = TRUE)`, user-defined functions in mutates, arranges, and filters, and custom extension type behaviour). Previously these queries failed because it wasn't possible to guarantee that the exec plan would be completely executed within a call to `RunWithCapturedR()` where we establish an event loop on the main R thread and launch a background thread to do "arrow stuff" that can queue tasks to run on the R thread.

The approach I took here was to stuff more of the ExecPlan-to-RecordBatchReader logic in a subclass of RecordBatchReader that doesn't call `plan->StartProducing()` until the first batch has been pulled. This lets you return a record batch reader and pass it around at the R level (currently how head/tail/a few other things are implemented), and as long as it's drained all at once (i.e., `reader$read_table()`) the calls into R will work. 

The R code calls within an exec plan *won't* work with `reader$read_next_batch()` or the C data interface because there we can't guarantee an event loop.

This also has the benefit of allowing us to inject some cancelability to the ExecPlan since we can check a StopToken after #13635 (ARROW-11841) for an interrupt (for all exec plans). The biggest benefit is, in my view, that the lifecycle of the ExecPlan is more explicit...before, the plan was stopped when the object was deleted but it was written in a way that I didn't understand for a long time. I think a reader subclass makes it more explicit and maybe will help to print out nested queries (since they're no longer eagerly evaluated).

An example of something that didn't work before that now does:

``` r
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

register_scalar_function(
  "times_32",
  function(context, x) x * 32.0,
  int32(),
  float64(),
  auto_convert = TRUE
)

record_batch(a = 1:1000) %>%
  dplyr::mutate(b = times_32(a)) %>%
  as_record_batch_reader() %>%
  as_arrow_table()
#> Table
#> 1000 rows x 2 columns
#> $a <int32>
#> $b <double>

record_batch(a = 1:1000) %>%
  dplyr::mutate(fun_result = times_32(a)) %>%
  head(11) %>%
  dplyr::collect()
#> # A tibble: 11 × 2
#>        a fun_result
#>    <int>      <dbl>
#>  1     1         32
#>  2     2         64
#>  3     3         96
#>  4     4        128
#>  5     5        160
#>  6     6        192
#>  7     7        224
#>  8     8        256
#>  9     9        288
#> 10    10        320
#> 11    11        352
```

<sup>Created on 2022-07-25 by the [reprex package](https://reprex.tidyverse.org) (v2.0.1)</sup>

Lead-authored-by: Dewey Dunnington <dewey@voltrondata.com>
Co-authored-by: Dewey Dunnington <dewey@fishandwhistle.net>
Signed-off-by: Dewey Dunnington <dewey@fishandwhistle.net>
@paleolimbot
Copy link
Member Author

paleolimbot commented Sep 16, 2022

Reprex to test, since it can really only be tested interactively:

# from an Arrow checkout:
# usethis::pr_fetch(13635)
library(arrow, warn.conflicts = FALSE)

tf <- tempfile()
readr::write_csv(vctrs::vec_rep(mtcars, 5e5), tf)

# try to slow down CSV reading
set_cpu_count(1)
set_io_thread_count(2)

# hit Control-C while this line runs!
# (for me this takes about 3 seconds to run without cancelling)
system.time(read_csv_arrow(tf))

# ExecPlans don't cancel as snappily as CSV reading since it's implemented
# at the end of the plan (i.e., we have to wait for a batch to be ready
# before the stop token is checked). To observe meaningful cancellation
# we need a bunch of files in a dataset.
even_more_files <- purrr::map_chr(1:10, function(i) {
  another_tf_copy <- tempfile()
  file.copy(tf, another_tf_copy)
  another_tf_copy
})

# hit Control-C while this line runs!
# (for me this takes about 30 seconds to run without cancelling,
# but with hitting the cancel button I can get it down to 10 seconds)
system.time(open_dataset(c(tf, even_more_files), format = "csv") %>% dplyr::collect())

@nealrichardson
Copy link
Member

🎉

> ds <- open_dataset("~/Downloads/nyc-taxi/", partitioning=c("year", "month"))
> collect(ds)
^CError in `dplyr::collect()`:
! Cancelled: Operation cancelled
/Users/enpiar/arrow/cpp/src/arrow/record_batch.cc:337  ReadNext(&batch)
/Users/enpiar/arrow/cpp/src/arrow/record_batch.cc:351  ToRecordBatches(). Detail: received signal 2
Run `rlang::last_error()` to see where the error occurred.
> 

@paleolimbot
Copy link
Member Author

@pitrou it looks like pyarrow's signal handler setup does not play nicely with shared objects: https://github.com/apache/arrow/actions/runs/3068370409/jobs/4955750729#step:7:28810

@pitrou
Copy link
Member

pitrou commented Sep 16, 2022

@pitrou it looks like pyarrow's signal handler setup does not play nicely with shared objects: https://github.com/apache/arrow/actions/runs/3068370409/jobs/4955750729#step:7:28810

What is the test doing exactly?

fut.MarkFinished(result);
});
fut.MarkFinished(result);
});

return fut;
});

thread_ptr->join();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to check that the thread is initialized before trying to join it?
(better than a hard crash)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check on result.ok() here, was that what you had in mind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I meant something like:

if (thread.joinable()) { thread.join(); }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


// This is an object whose scope ensures we do not register signal handlers when
// evaluating R code when that evaluation happens via SafeCallIntoR.
class SafeCallIntoRContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between "SafeCallIntoRContext" and "RunWithCapturedRContext" isn't obvious without going to the source and reading the docstrings. Would you have better names in mind to ease reading code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed these names to WithSignalHandlerContext and `WithoutSignalHandlerContext.

}

~RunWithCapturedRContext() {
if (MainRThread::GetInstance().SignalStopSourceEnabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you do this only if Init has succeeded?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


~SafeCallIntoRContext() {
if (!MainRThread::GetInstance().IsMainThread() &&
MainRThread::GetInstance().SignalStopSourceEnabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you instead test a boolean flag that was set to true in the constructor if UnregisterCancellingSignalHandler was called?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

}

void DisableSignalStopSource() {
if (SignalStopSourceEnabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you test stop_source_ instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SignalStopSourceEnabled() checks for nullptr...is there a better check I should do on stop_source_? In the forked process scenario, it looks like stop_source_ is not null but the signal registration fails anyway. I downgraded that to a warning but perhaps I can catch that here to avoid attempging signal registration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's more of a bug on the C++ side, I think. The signal StopSource code is not sanitized for forking currently.

r/src/safe-call-into-r.h Outdated Show resolved Hide resolved
@@ -213,7 +318,7 @@ static inline arrow::Status RunWithCapturedRIfPossibleVoid(
return true;
});
ARROW_RETURN_NOT_OK(result);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is useless now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed!

@pitrou
Copy link
Member

pitrou commented Sep 27, 2022

Is the test-r-linux-valgrind failure expected?

@paleolimbot
Copy link
Member Author

The valgrind error is not new, but it is also not fixed by this PR (I had hoped my changes to RunWithCapturedR, in particular the part where I properly return an error status instead of throwing an exception, would fix this). I created ARROW-17879 for that.

r/src/compute-exec.cpp Outdated Show resolved Hide resolved
@@ -77,8 +89,9 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string,
}
});

thread_ptr->join();
delete thread_ptr;
if (thread.joinable()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for the if here as the thread is always launched above.

paleolimbot and others added 2 commits September 28, 2022 11:42
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
@pitrou
Copy link
Member

pitrou commented Oct 3, 2022

I've restarted the failed R builds. Is this otherwise ready for merging @paleolimbot ?

@paleolimbot
Copy link
Member Author

From my end, yes!

@paleolimbot paleolimbot merged commit 0aacc28 into apache:master Oct 3, 2022
@ursabot
Copy link

ursabot commented Oct 3, 2022

Benchmark runs are scheduled for baseline = ec579df and contender = 0aacc28. 0aacc28 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️0.0% ⬆️0.0%] test-mac-arm
[Failed ⬇️0.27% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️1.0% ⬆️0.0%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] 0aacc28c ec2-t3-xlarge-us-east-2
[Failed] 0aacc28c test-mac-arm
[Failed] 0aacc28c ursa-i9-9960x
[Finished] 0aacc28c ursa-thinkcentre-m75q
[Finished] ec579df6 ec2-t3-xlarge-us-east-2
[Failed] ec579df6 test-mac-arm
[Failed] ec579df6 ursa-i9-9960x
[Finished] ec579df6 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@paleolimbot paleolimbot deleted the r-cancelme branch October 4, 2022 17:36
zagto pushed a commit to zagto/arrow that referenced this pull request Oct 7, 2022
…d function (apache#13706)

This PR adds support for more types of queries that include calls to R code (i.e., `map_batches(..., .lazy = TRUE)`, user-defined functions in mutates, arranges, and filters, and custom extension type behaviour). Previously these queries failed because it wasn't possible to guarantee that the exec plan would be completely executed within a call to `RunWithCapturedR()` where we establish an event loop on the main R thread and launch a background thread to do "arrow stuff" that can queue tasks to run on the R thread.

The approach I took here was to stuff more of the ExecPlan-to-RecordBatchReader logic in a subclass of RecordBatchReader that doesn't call `plan->StartProducing()` until the first batch has been pulled. This lets you return a record batch reader and pass it around at the R level (currently how head/tail/a few other things are implemented), and as long as it's drained all at once (i.e., `reader$read_table()`) the calls into R will work. 

The R code calls within an exec plan *won't* work with `reader$read_next_batch()` or the C data interface because there we can't guarantee an event loop.

This also has the benefit of allowing us to inject some cancelability to the ExecPlan since we can check a StopToken after apache#13635 (ARROW-11841) for an interrupt (for all exec plans). The biggest benefit is, in my view, that the lifecycle of the ExecPlan is more explicit...before, the plan was stopped when the object was deleted but it was written in a way that I didn't understand for a long time. I think a reader subclass makes it more explicit and maybe will help to print out nested queries (since they're no longer eagerly evaluated).

An example of something that didn't work before that now does:

``` r
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

register_scalar_function(
  "times_32",
  function(context, x) x * 32.0,
  int32(),
  float64(),
  auto_convert = TRUE
)

record_batch(a = 1:1000) %>%
  dplyr::mutate(b = times_32(a)) %>%
  as_record_batch_reader() %>%
  as_arrow_table()
#> Table
#> 1000 rows x 2 columns
#> $a <int32>
#> $b <double>

record_batch(a = 1:1000) %>%
  dplyr::mutate(fun_result = times_32(a)) %>%
  head(11) %>%
  dplyr::collect()
#> # A tibble: 11 × 2
#>        a fun_result
#>    <int>      <dbl>
#>  1     1         32
#>  2     2         64
#>  3     3         96
#>  4     4        128
#>  5     5        160
#>  6     6        192
#>  7     7        224
#>  8     8        256
#>  9     9        288
#> 10    10        320
#> 11    11        352
```

<sup>Created on 2022-07-25 by the [reprex package](https://reprex.tidyverse.org) (v2.0.1)</sup>

Lead-authored-by: Dewey Dunnington <dewey@voltrondata.com>
Co-authored-by: Dewey Dunnington <dewey@fishandwhistle.net>
Signed-off-by: Dewey Dunnington <dewey@fishandwhistle.net>
fatemehp pushed a commit to fatemehp/arrow that referenced this pull request Oct 17, 2022
…d function (apache#13706)

This PR adds support for more types of queries that include calls to R code (i.e., `map_batches(..., .lazy = TRUE)`, user-defined functions in mutates, arranges, and filters, and custom extension type behaviour). Previously these queries failed because it wasn't possible to guarantee that the exec plan would be completely executed within a call to `RunWithCapturedR()` where we establish an event loop on the main R thread and launch a background thread to do "arrow stuff" that can queue tasks to run on the R thread.

The approach I took here was to stuff more of the ExecPlan-to-RecordBatchReader logic in a subclass of RecordBatchReader that doesn't call `plan->StartProducing()` until the first batch has been pulled. This lets you return a record batch reader and pass it around at the R level (currently how head/tail/a few other things are implemented), and as long as it's drained all at once (i.e., `reader$read_table()`) the calls into R will work. 

The R code calls within an exec plan *won't* work with `reader$read_next_batch()` or the C data interface because there we can't guarantee an event loop.

This also has the benefit of allowing us to inject some cancelability to the ExecPlan since we can check a StopToken after apache#13635 (ARROW-11841) for an interrupt (for all exec plans). The biggest benefit is, in my view, that the lifecycle of the ExecPlan is more explicit...before, the plan was stopped when the object was deleted but it was written in a way that I didn't understand for a long time. I think a reader subclass makes it more explicit and maybe will help to print out nested queries (since they're no longer eagerly evaluated).

An example of something that didn't work before that now does:

``` r
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

register_scalar_function(
  "times_32",
  function(context, x) x * 32.0,
  int32(),
  float64(),
  auto_convert = TRUE
)

record_batch(a = 1:1000) %>%
  dplyr::mutate(b = times_32(a)) %>%
  as_record_batch_reader() %>%
  as_arrow_table()
#> Table
#> 1000 rows x 2 columns
#> $a <int32>
#> $b <double>

record_batch(a = 1:1000) %>%
  dplyr::mutate(fun_result = times_32(a)) %>%
  head(11) %>%
  dplyr::collect()
#> # A tibble: 11 × 2
#>        a fun_result
#>    <int>      <dbl>
#>  1     1         32
#>  2     2         64
#>  3     3         96
#>  4     4        128
#>  5     5        160
#>  6     6        192
#>  7     7        224
#>  8     8        256
#>  9     9        288
#> 10    10        320
#> 11    11        352
```

<sup>Created on 2022-07-25 by the [reprex package](https://reprex.tidyverse.org) (v2.0.1)</sup>

Lead-authored-by: Dewey Dunnington <dewey@voltrondata.com>
Co-authored-by: Dewey Dunnington <dewey@fishandwhistle.net>
Signed-off-by: Dewey Dunnington <dewey@fishandwhistle.net>
fatemehp pushed a commit to fatemehp/arrow that referenced this pull request Oct 17, 2022
…13635)

Lead-authored-by: Dewey Dunnington <dewey@fishandwhistle.net>
Co-authored-by: Dewey Dunnington <dewey@voltrondata.com>
Signed-off-by: Dewey Dunnington <dewey@voltrondata.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants