Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(r): Add async infrastructure for specific methods #985

Draft
wants to merge 36 commits into
base: main
Choose a base branch
from

Conversation

paleolimbot
Copy link
Member

@paleolimbot paleolimbot commented Aug 18, 2023

This is a follow-up to apache/arrow-nanoarrow#211 , which was closed because the async functionality for interacting with the array stream is more closely tied to the potential async functionality of other ADBC methods.

This can/should remain in draft form for some time...async is hard and this PR is currently intended to act as a starting place to discuss what this would look like.

I'm aware that this can/perhaps should be using later and/or promises and/or futures. I don't know exactly what that would look like but happy to discuss!

Thanks to krlmlr for pushing for this functionality!

A reprex to get started:

# pak::pak("apache/arrow-adbc/r/adbcdrivermanager#985")
queue <- adbcdrivermanager:::adbc_callback_queue()

# Add a few callbacks
stream <- nanoarrow::basic_array_stream(list(1:5))
adbcdrivermanager:::adbc_array_stream_get_next_async(stream, function(array) {
  message("Done 1!")
  print(as.vector(array))
}, queue = queue)

stream <- nanoarrow::basic_array_stream(list(6:10))
adbcdrivermanager:::adbc_array_stream_get_next_async(stream, function(array) {
  message("Done 2!")
  print(as.vector(array))
}, queue = queue)

stream <- nanoarrow::basic_array_stream(list(11:15))
adbcdrivermanager:::adbc_array_stream_get_next_async(stream, function(array) {
  message("Done 3!")
  print(as.vector(array))
}, queue = queue)

Sys.sleep(0.5)

# Run callback
(adbcdrivermanager:::adbc_callback_queue_run_pending(queue))
#> Done 1!
#> [1] 1 2 3 4 5
#> Done 2!
#> [1]  6  7  8  9 10
#> Done 3!
#> [1] 11 12 13 14 15
#> [1] 3

# Make sure gc() doesn't crash
gc()
#>           used (Mb) gc trigger (Mb) limit (Mb) max used (Mb)
#> Ncells  712327 38.1    1340807 71.7         NA  1340807 71.7
#> Vcells 1268606  9.7    8388608 64.0      16384  1996965 15.3

Created on 2023-08-18 with reprex v2.0.2

Copy link
Collaborator

@krlmlr krlmlr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, nice! I'll need to do more research to find out what's needed to integrate it with later and promises.

r/adbcdrivermanager/tests/testthat/test-async.R Outdated Show resolved Hide resolved
@paleolimbot
Copy link
Member Author

Far from usable, but it works! Probably needs some version of "read everything from stream with a callback for each batch...I'm not sure that fits nicely into promises since there are an indeterminate number of batches.

library(adbcdrivermanager)

db <- adbc_database_init(adbcsqlite::adbcsqlite())
con <- adbc_connection_init(db)

flights <- nycflights13::flights
flights$time_hour <- NULL
flights |>
  write_adbc(con, "flights")


# Sync
stmt <- adbc_statement_init(con)
adbc_statement_set_sql_query(stmt, "SELECT * from flights")
stream <- nanoarrow::nanoarrow_allocate_array_stream()
adbc_statement_execute_query(stmt, stream)
#> [1] -1
stream
#> <nanoarrow_array_stream struct<year: int64, month: int64, day: int64, dep_time: int64, sched_dep_time: int64, dep_delay: double, arr_time: int64, sched_arr_time: int64, arr_delay: double, carrier: string, flight: int64, tailnum: string, origin: string, dest: string, air_time: double, distance: double, hour: double, minute: double>>
#>  $ get_schema:function ()  
#>  $ get_next  :function (schema = x$get_schema(), validate = TRUE)  
#>  $ release   :function ()

value <- NULL


# Async
(promise <- con |> 
  adbc_statement_init() |> 
  adbc_statement_set_sql_query("SELECT * from flights") |> 
  adbcdrivermanager:::adbc_statement_execute_query_promise() |> 
  promises::then(function(stream) {
    adbcdrivermanager:::adbc_array_stream_get_next_promise(stream)
  }) |> 
  promises::then(function(array) {
    tibble::as_tibble(array)
  }) |> 
  promises::then(function(tbl) {
    value <<- tbl
  }))
#> <Promise [pending]>

value
#> NULL

while(!later::loop_empty()) {
  later::run_now()
}

promise
#> <Promise [fulfilled: tbl_df]>
value
#> # A tibble: 1,024 × 18
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <dbl> <dbl> <dbl>    <dbl>          <dbl>     <dbl>    <dbl>          <dbl>
#>  1  2013     1     1      517            515         2      830            819
#>  2  2013     1     1      533            529         4      850            830
#>  3  2013     1     1      542            540         2      923            850
#>  4  2013     1     1      544            545        -1     1004           1022
#>  5  2013     1     1      554            600        -6      812            837
#>  6  2013     1     1      554            558        -4      740            728
#>  7  2013     1     1      555            600        -5      913            854
#>  8  2013     1     1      557            600        -3      709            723
#>  9  2013     1     1      557            600        -3      838            846
#> 10  2013     1     1      558            600        -2      753            745
#> # ℹ 1,014 more rows
#> # ℹ 10 more variables: arr_delay <dbl>, carrier <chr>, flight <dbl>,
#> #   tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>,
#> #   hour <dbl>, minute <dbl>

Created on 2023-09-05 with reprex v2.0.2

@krlmlr
Copy link
Collaborator

krlmlr commented Sep 6, 2023

Wow, amazing! Let me digest this, but your use of promises looks like the thing I was looking for.

Copy link
Collaborator

@krlmlr krlmlr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Dewey. I really had to wait for shikokuchuo/mirai#97 (reply in thread) to be able to review this.

Will it still work? Can we get an API similarly to that for mirai where tasks can be turned into promises just by calling promises::as.promise() ?

R_ReleaseObject(return_value_xptr);

// Release the dependence of the task on this callback queue
SEXP task_sym = PROTECT(Rf_install("task"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are typically fine without protection.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will probably leave the PROTECTs in to keep the pattern of "protect all SEXPs" while in use (although I can see how the value of a variable in some other environment and the symbol to which it was assigned would be already protected).

Comment on lines 196 to 197
// thrown. cpp11 handles this using BEGIN_CPP11 an END_CPP11...we would need
// similar for safety here.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you consider vendoring cpp11?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't personally see the value proposition for a low-level project like this one that wraps a C API (where we already have a non-exception-based error handling scheme and zero C++ idioms that integrate). Debugging a problem that is many layers deep in a C++ template is also the worst (and opening an issue or PR has not historically gotten a timely response).

auto task = reinterpret_cast<Task*>(R_ExternalPtrAddr(task_xptr));
CallbackQueue::RCallback callback =
queue->InitCallback(callback_env, out_stream_xptr, error_xptr);
task->worker = new std::thread([statement, out_stream, error, callback, queue] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does std have a notion of a thread pool, or will perhaps system threads be reused transparently?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure this will launch a new thread every time. I will have to check C++17 since I think we require that now.

});

UNPROTECT(1);
return R_NilValue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return task_xptr here, and in the R layer create a promise bound to this specific task?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have to revisit what's happening here but I think that the callback_env is what lets it be converted to a promise.

Copy link
Member Author

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be too bad to rebase...I'll circle back in the next day or two and tackle some of the TODOs I left myself.

R_ReleaseObject(return_value_xptr);

// Release the dependence of the task on this callback queue
SEXP task_sym = PROTECT(Rf_install("task"));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will probably leave the PROTECTs in to keep the pattern of "protect all SEXPs" while in use (although I can see how the value of a variable in some other environment and the symbol to which it was assigned would be already protected).

Comment on lines 196 to 197
// thrown. cpp11 handles this using BEGIN_CPP11 an END_CPP11...we would need
// similar for safety here.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't personally see the value proposition for a low-level project like this one that wraps a C API (where we already have a non-exception-based error handling scheme and zero C++ idioms that integrate). Debugging a problem that is many layers deep in a C++ template is also the worst (and opening an issue or PR has not historically gotten a timely response).

auto task = reinterpret_cast<Task*>(R_ExternalPtrAddr(task_xptr));
CallbackQueue::RCallback callback =
queue->InitCallback(callback_env, out_stream_xptr, error_xptr);
task->worker = new std::thread([statement, out_stream, error, callback, queue] {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure this will launch a new thread every time. I will have to check C++17 since I think we require that now.

});

UNPROTECT(1);
return R_NilValue;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have to revisit what's happening here but I think that the callback_env is what lets it be converted to a promise.

@github-actions github-actions bot added this to the ADBC Libraries 1.0.0 milestone May 3, 2024
@lidavidm lidavidm removed this from the ADBC Libraries 1.0.0 milestone May 3, 2024
@paleolimbot
Copy link
Member Author

@krlmlr I updated this a bit with a new approach that floated in as I was working on some other things. The callback-based approach came from me working with Arrow C++, where we have a lot of helpers for this kind of thing. I think the new approach (based on std::future) is simpler and generalizes better; however I'm happy to revert to the previous approach if you think it will integrate better. I wish that later provided a cleaner way to poll something than to reschedule a task every xxx seconds...obviously polling is suboptimal but I think here we're stuck doing it one way or another.

It would be nice to figure out how read_adbc_async() or write_adbc_async() or execute_adbc_async() might look...read_adbc_async() would be the most difficult because there are at 2-3 async calls that might have to get chained together (or we could wrap them all in a single async task that collects the entire array stream). I think there also needs to be some way to "lock" a connection to prevent other things from happening while stuff is happening on another thread (or maybe that's built in to the drivers?).

The code I'm using to play with this is:

adbc_async_sleep(10000) |> 
  adbc_async_task_wait()

...to ensure that adbc_async_task_wait() can be interrupted and

library(adbcdrivermanager)

flights <- nycflights13::flights

con <- adbc_database_init(adbcpostgresql::adbcpostgresql(), 
                          uri = Sys.getenv("ADBC_POSTGRESQL_TEST_URI")) |> 
  adbc_connection_init()

stmt <- adbc_statement_init(con, adbc.ingest.target_table = "flights")
adbc_statement_bind_stream(stmt, flights)
task <- adbc_statement_execute_query_async(stmt)
# You should be able to request cancel using Control-C here
adbc_async_task_wait(task)

...for a more real-world scenario.

@krlmlr
Copy link
Collaborator

krlmlr commented May 11, 2024

Thanks, Dewey, nice!

Regarding chaining: Can we chain promises or futures in C++?

Regarding polling: How does mirai do it? Have you seen the C++ interfaces in later and promises? We might need to add a hard dependency, though, or move the async logic to a package that doesn't mind that dependency.

@paleolimbot
Copy link
Member Author

I'll play with this a bit more and try to get this down to a minimum viable feature that one or both of us can build on.

Regarding polling: How does mirai do it?

It looks like this happens mostly in nanonext through a runtime dependency on later's C export:

https://github.com/shikokuchuo/nanonext/blob/b139a84512140ac5643f8f3ecb76a6ce8221c531/src/aio.c#L1292

...which is perhaps a lower-level version of just vendoring the later function definition: https://github.com/r-lib/later/blob/968542a0b42fa191e6a5a57f180a6af345083bf5/inst/include/later.h#L50

I can play with this a little but off the top of my head, I think here one could pass an optional R callback when the async method is launched, which would invoke the runtime "later" dependency (this would replace the ugly later-scheduling itself thing I have here). If there's no callback, one can still poll the adbc_async_task.

Regarding chaining: Can we chain promises or futures in C++?

As long as we know ahead of time what has to happen and it can all happen on another thread, I think we can just define an async launcher that does it all and wait for it to finish. In this case, it would be AdbcStatementExecuteQuery + ArrowArrayStream::get_schema() + a bunch of ArrowArrayStream::get_next(). If we really need the flexibility to compose things, R-level promise things should work.

move the async logic to a package that doesn't mind that dependency.

With the runtime loading of the later C callable I think we can do this here, although ADBC is quite nice for this because we could in theory document how to get/call the methods (all of this is ABI stable!).

@krlmlr
Copy link
Collaborator

krlmlr commented May 13, 2024

Thanks, Dewey. I like this idea I read here:

  • Keep {adbdrivermanager} free of the strong {later} dependency
  • Instead, support a callback that can bring in {later}/{promises}
    • {adbcdrivermanager} could even provide a default callback that uses {later} and fails if the package is not installed
  • Polling is always available

Why do we need to tightly couple execution and data retrieval? I could imagine the following process:

  • Creation of the (parameterized or not) query sends it to the server, returns a "bound" or "unbound" promise
  • Optional: Parameter binding can accept that "unbound" promise and return a new "bound" promise
  • Schema retrieval accepts that "bound" promise and returns a new "ready" promise
  • Retrieval of each chunk accepts a "ready" promise and returns another "ready" promise
  • When complete, no more promises are returned
graph
    PQ("Creation of the parameterized query") --> |Unbound| PB("Parameter binding")
    PB("Parameter binding") --> |Bound| SR
    NPQ("Creation of the not parameterized query") --> |Bound| SR
    SR("Schema retrieval") --> |Ready| RC
    RC("Retrieval of each chunk") -.-> |Ready| RC
    RC -.-> |NULL| End(Done)
Loading

image

@paleolimbot
Copy link
Member Author

Nice! I have a concrete idea in my head about how the first things you described might go but I'm stuck as to how to compose fetching an indeterminate number of results by composing promises (without potentially costly overhead of waiting for a promise to resolve before launching another thread and waiting for that). Apologies if there's a simple solution there!

some_con |>
  adbc_async_prepare() %...>%
  adbc_async_execute_query() %...>%
  # What happens here?
  (function(array_stream) { what_next() })

Why do we need to tightly couple execution and data retrieval?

I have a more concrete idea in my head about how one might fetch an entire result (as a vector of C-level arrays) and then call a callback when that has completed. For example:

some_con |>
  adbc_async_prepare() %...>%
  adbc_async_execute_and_read_all() %...>%
  # This would be a basic_array_stream whose arrays have already been fetched
  nanoarrow::convert_array_stream()

Ideally we'd be building up the data frame result on the R thread while waiting for the next batch (but the infrastructure for that isn't merged into nanoarrow yet).

@krlmlr
Copy link
Collaborator

krlmlr commented May 20, 2024

Thanks, Dewey. I was really thinking about:

some_con |>
  adbc_async_prepare() %...>%
  adbc_async_execute_query() %...>%
  adbc_async_fetch_chunk() %...>%
  adbc_async_fetch_chunk() %...>%
  adbc_async_fetch_chunk() %...>%
  ...

With:

adbc_async_fetch_chunk <- function() {
  if (fetch_thread_running()) {
    ask_fetch_thread_get_more_data()
  } else {
    start_new_fetch_thread()
  }
}

Avoiding race conditions is left as an exercise to the reader 🤣

@krlmlr
Copy link
Collaborator

krlmlr commented Jun 16, 2024

Thanks for the effort! I see there are tests with sleeping, does this code already work with database connections? Can you share a working setup?

@paleolimbot
Copy link
Member Author

The working example I have is only with queries that don't require reading in async (the reading in async is implemented but I haven't played with it much).

library(adbcdrivermanager)

# docker compose up postgres-test
test_uri <- "postgresql://localhost:5432/postgres?user=postgres&password=password"

# Needs to be big or else it doesn't take long enough to test that cancellation works
flights <- vctrs::vec_rep(nycflights13::flights, 10)
flights <- nanoarrow::as_nanoarrow_array_stream(flights)

con <- adbc_database_init(adbcpostgresql::adbcpostgresql(), uri = test_uri) |> 
  adbc_connection_init()

stmt <- adbc_statement_init(con, adbc.ingest.target_table = "flights")
adbc_statement_bind_stream(stmt, flights)
task <- adbcdrivermanager:::adbc_statement_execute_query_async(stmt)

# You should be able to request cancel using Control-C here
adbcdrivermanager:::adbc_async_task_wait(task)
#> $statement
#> <adbcpostgresql_statement at 0x11d785370> 
#> List of 1
#>  $ connection:<adbcpostgresql_connection at 0x11d7e0c10> 
#> List of 1
#>   ..$ database:<adbcpostgresql_database at 0x11d7a47c0> 
#> List of 1
#>   .. ..$ driver:<adbcpostgresql_driver_postgresql> List of 2
#>   .. .. ..$ driver_init_func:Class 'adbc_driver_init_func' <externalptr> 
#>   .. .. ..$ .child_count    : int 0
#> 
#> $stream
#> NULL
#> 
#> $rows_affected
#> [1] 3367760

# In case you want to try this again
# con |> execute_adbc("DROP TABLE flights;")

Created on 2024-06-16 with reprex v2.1.0

Another example I was playing with was using the "sleep" version to ensure that it wasn't too costly to make these async calls:

bench::mark(
  a1 = adbcdrivermanager:::adbc_async_sleep(1) |> adbcdrivermanager:::adbc_async_task_wait(),
  a10 = adbcdrivermanager:::adbc_async_sleep(10) |> adbcdrivermanager:::adbc_async_task_wait(),
  a100 = adbcdrivermanager:::adbc_async_sleep(100) |> adbcdrivermanager:::adbc_async_task_wait(),
  a500 = adbcdrivermanager:::adbc_async_sleep(500) |> adbcdrivermanager:::adbc_async_task_wait(),
  check = FALSE
)
#> # A tibble: 4 × 6
#>   expression      min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr> <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 a1           1.14ms    1.4ms    704.       338KB        0
#> 2 a10         10.52ms   12.5ms     81.0       560B        0
#> 3 a100       100.84ms  103.6ms      9.71      560B        0
#> 4 a500       497.31ms  497.3ms      2.01      560B        0

Created on 2024-06-16 with reprex v2.1.0

You should be able to try these with promises by turning the task into a promise using as.promise(). Feel free to post your adventures here so that I have better examples to test with!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants