Skip to content

Commit

Permalink
with passing test
Browse files Browse the repository at this point in the history
  • Loading branch information
paleolimbot committed Jun 7, 2023
1 parent adf7516 commit 3f6214c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
10 changes: 10 additions & 0 deletions r/R/array-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,13 @@ names.nanoarrow_array_stream <- function(x, ...) {
`$.nanoarrow_array_stream` <- function(x, i, ...) {
x[[i]]
}

nanoarrow_array_stream_get_next_async <- function(stream, callback,
schema = stream$get_schema()) {
callback_env <- new.env(parent = emptyenv())
callback_env$callback <- callback
array <- nanoarrow_allocate_array()
nanoarrow_array_set_schema(array, schema, validate = FALSE)
.Call(nanoarrow_c_array_stream_get_next_async, stream, array, callback_env)
invisible(NULL)
}
10 changes: 3 additions & 7 deletions r/src/nanoarrow_cpp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ class CallbackQueue {
void* return_value;
};

void AddTask(std::unique_ptr<std::thread> task) { tasks_.push_back(std::move(task)); }

void AddCallback(RCallback callback) {
std::lock_guard<std::mutex> lock(callbacks_lock_);
pending_callbacks_.push_back(callback);
Expand Down Expand Up @@ -251,6 +249,7 @@ class CallbackQueue {
PROTECT(Rf_lang3(callback_sym, return_code_sexp, return_value_xptr));

Rf_eval(callback_call, env_sexp);
UNPROTECT(6);
}

static CallbackQueue& GetInstance() {
Expand All @@ -259,7 +258,6 @@ class CallbackQueue {
}

private:
std::deque<std::unique_ptr<std::thread>> tasks_;
std::deque<RCallback> pending_callbacks_;
std::mutex callbacks_lock_;
};
Expand All @@ -279,13 +277,11 @@ extern "C" void nanoarrow_array_stream_get_next_async(SEXP array_stream_xptr,
array};

// Bad: need to keep track of these somewhere instead of leaking them all
std::unique_ptr<std::thread> worker(new std::thread([array_stream, array, callback] {
std::thread* worker = new std::thread([array_stream, array, callback] {
CallbackQueue::RCallback callback_out = callback;
callback_out.return_code = array_stream->get_next(array_stream, array);
CallbackQueue::GetInstance().AddCallback(callback_out);
}));

CallbackQueue::GetInstance().AddTask(std::move(worker));
});
}

extern "C" int nanoarrow_run_callbacks(void) {
Expand Down
10 changes: 10 additions & 0 deletions r/tests/testthat/test-array-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,13 @@ test_that("Errors from user array stream finalizer are ignored", {
expect_false(nanoarrow_pointer_is_valid(stream))
expect_silent(stream$release())
})

test_that("Async get next", {
stream <- basic_array_stream(list(1:5))
nanoarrow_array_stream_get_next_async(stream, function(code, array) {
expect_identical(code, 0L)
expect_identical(convert_array(array), 1:5)
})
Sys.sleep(0.5)
expect_identical(run_callbacks(), 1L)
})

0 comments on commit 3f6214c

Please sign in to comment.