diff --git a/r/R/array-stream.R b/r/R/array-stream.R index 54863a4b7..3b3472cb9 100644 --- a/r/R/array-stream.R +++ b/r/R/array-stream.R @@ -259,3 +259,13 @@ names.nanoarrow_array_stream <- function(x, ...) { `$.nanoarrow_array_stream` <- function(x, i, ...) { x[[i]] } + +nanoarrow_array_stream_get_next_async <- function(stream, callback, + schema = stream$get_schema()) { + callback_env <- new.env(parent = emptyenv()) + callback_env$callback <- callback + array <- nanoarrow_allocate_array() + nanoarrow_array_set_schema(array, schema, validate = FALSE) + .Call(nanoarrow_c_array_stream_get_next_async, stream, array, callback_env) + invisible(NULL) +} diff --git a/r/src/nanoarrow_cpp.cc b/r/src/nanoarrow_cpp.cc index 031c8cd78..c08594174 100644 --- a/r/src/nanoarrow_cpp.cc +++ b/r/src/nanoarrow_cpp.cc @@ -216,8 +216,6 @@ class CallbackQueue { void* return_value; }; - void AddTask(std::unique_ptr task) { tasks_.push_back(std::move(task)); } - void AddCallback(RCallback callback) { std::lock_guard lock(callbacks_lock_); pending_callbacks_.push_back(callback); @@ -251,6 +249,7 @@ class CallbackQueue { PROTECT(Rf_lang3(callback_sym, return_code_sexp, return_value_xptr)); Rf_eval(callback_call, env_sexp); + UNPROTECT(6); } static CallbackQueue& GetInstance() { @@ -259,7 +258,6 @@ class CallbackQueue { } private: - std::deque> tasks_; std::deque pending_callbacks_; std::mutex callbacks_lock_; }; @@ -279,13 +277,11 @@ extern "C" void nanoarrow_array_stream_get_next_async(SEXP array_stream_xptr, array}; // Bad: need to keep track of these somewhere instead of leaking them all - std::unique_ptr worker(new std::thread([array_stream, array, callback] { + std::thread* worker = new std::thread([array_stream, array, callback] { CallbackQueue::RCallback callback_out = callback; callback_out.return_code = array_stream->get_next(array_stream, array); CallbackQueue::GetInstance().AddCallback(callback_out); - })); - - CallbackQueue::GetInstance().AddTask(std::move(worker)); + }); } extern "C" int nanoarrow_run_callbacks(void) { diff --git a/r/tests/testthat/test-array-stream.R b/r/tests/testthat/test-array-stream.R index 195c31cc8..8553f614f 100644 --- a/r/tests/testthat/test-array-stream.R +++ b/r/tests/testthat/test-array-stream.R @@ -212,3 +212,13 @@ test_that("Errors from user array stream finalizer are ignored", { expect_false(nanoarrow_pointer_is_valid(stream)) expect_silent(stream$release()) }) + +test_that("Async get next", { + stream <- basic_array_stream(list(1:5)) + nanoarrow_array_stream_get_next_async(stream, function(code, array) { + expect_identical(code, 0L) + expect_identical(convert_array(array), 1:5) + }) + Sys.sleep(0.5) + expect_identical(run_callbacks(), 1L) +})