diff --git a/r/NAMESPACE b/r/NAMESPACE index 6cdf8c3f9c1..e98cdd51fb7 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -61,6 +61,7 @@ S3method(as_data_type,pyarrow.lib.DataType) S3method(as_data_type,pyarrow.lib.Field) S3method(as_record_batch,RecordBatch) S3method(as_record_batch,Table) +S3method(as_record_batch,arrow_dplyr_query) S3method(as_record_batch,data.frame) S3method(as_record_batch,pyarrow.lib.RecordBatch) S3method(as_record_batch,pyarrow.lib.Table) @@ -227,6 +228,7 @@ export(ReadableFile) export(RecordBatch) export(RecordBatchFileReader) export(RecordBatchFileWriter) +export(RecordBatchReader) export(RecordBatchStreamReader) export(RecordBatchStreamWriter) export(RoundMode) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 1675bce7b85..72f9dec2763 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -153,7 +153,7 @@ head.Scanner <- function(x, n = 6L, ...) { #' @export tail.Scanner <- function(x, n = 6L, ...) { - tail_from_batches(dataset___Scanner__ScanBatches(x), n) + tail_from_batches(dataset___Scanner__ScanBatches(x), n)$read_table() } tail_from_batches <- function(batches, n) { @@ -169,43 +169,57 @@ tail_from_batches <- function(batches, n) { if (n <= 0) break } # rev() the result to put the batches back in the right order - Table$create(!!!rev(result)) + RecordBatchReader$create(batches = rev(result)) } #' Apply a function to a stream of RecordBatches #' #' As an alternative to calling `collect()` on a `Dataset` query, you can #' use this function to access the stream of `RecordBatch`es in the `Dataset`. -#' This lets you aggregate on each chunk and pull the intermediate results into -#' a `data.frame` for further aggregation, even if you couldn't fit the whole -#' `Dataset` result in memory. +#' This lets you do more complex operations in R that operate on chunks of data +#' without having to hold the entire Dataset in memory at once. You can include +#' `map_batches()` in a dplyr pipeline and do additional dplyr methods on the +#' stream of data in Arrow after it. #' -#' This is experimental and not recommended for production use. +#' Note that, unlike the core dplyr methods that are implemented in the Arrow +#' query engine, `map_batches()` is not lazy: it starts evaluating on the data +#' when you call it, even if you send its result to another pipeline function. +#' +#' This is experimental and not recommended for production use. It is also +#' single-threaded and runs in R not C++, so it won't be as fast as core +#' Arrow methods. #' #' @param X A `Dataset` or `arrow_dplyr_query` object, as returned by the #' `dplyr` methods on `Dataset`. #' @param FUN A function or `purrr`-style lambda expression to apply to each -#' batch +#' batch. It must return a RecordBatch or something coercible to one via +#' `as_record_batch()'. #' @param ... Additional arguments passed to `FUN` -#' @param .data.frame logical: collect the resulting chunks into a single -#' `data.frame`? Default `TRUE` +#' @param .data.frame Deprecated argument, ignored +#' @return An `arrow_dplyr_query`. #' @export -map_batches <- function(X, FUN, ..., .data.frame = TRUE) { - # TODO: ARROW-15271 possibly refactor do_exec_plan to return a RecordBatchReader +map_batches <- function(X, FUN, ..., .data.frame = NULL) { + if (!is.null(.data.frame)) { + warning( + "The .data.frame argument is deprecated. ", + "Call collect() on the result to get a data.frame.", + call. = FALSE + ) + } plan <- ExecPlan$create() final_node <- plan$Build(as_adq(X)) reader <- plan$Run(final_node) FUN <- as_mapper(FUN) - # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, - # X$temp_columns, and X$group_by_vars - # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE + # TODO: for future consideration + # * Move eval to C++ and make it a generator so it can stream, not block + # * Accept an output schema argument: with that, we could make this lazy (via collapse) batch <- reader$read_next_batch() res <- vector("list", 1024) i <- 0L while (!is.null(batch)) { i <- i + 1L - res[[i]] <- FUN(batch, ...) + res[[i]] <- as_record_batch(FUN(batch, ...)) batch <- reader$read_next_batch() } @@ -214,13 +228,7 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) { res <- res[seq_len(i)] } - if (.data.frame & inherits(res[[1]], "arrow_dplyr_query")) { - res <- dplyr::bind_rows(map(res, dplyr::collect)) - } else if (.data.frame) { - res <- dplyr::bind_rows(map(res, as.data.frame)) - } - - res + RecordBatchReader$create(batches = res) } #' @usage NULL diff --git a/r/R/dplyr-collect.R b/r/R/dplyr-collect.R index 3586dda33e8..1c875b15b02 100644 --- a/r/R/dplyr-collect.R +++ b/r/R/dplyr-collect.R @@ -27,8 +27,10 @@ collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) { } # See query-engine.R for ExecPlan/Nodes + plan <- ExecPlan$create() + final_node <- plan$Build(x) tryCatch( - tab <- do_exec_plan(x), + tab <- plan$Run(final_node)$read_table(), # n = 4 because we want the error to show up as being from collect() # and not handle_csv_read_error() error = function(e, call = caller_env(n = 4)) { @@ -36,6 +38,19 @@ collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) { } ) + # TODO(ARROW-16607): move KVM handling into ExecPlan + if (ncol(tab)) { + # Apply any column metadata from the original schema, where appropriate + new_r_metadata <- get_r_metadata_from_old_schema( + tab$schema, + source_data(x)$schema, + drop_attributes = has_aggregation(x) + ) + if (!is.null(new_r_metadata)) { + tab$r_metadata <- new_r_metadata + } + } + if (as_data_frame) { df <- as.data.frame(tab) restore_dplyr_features(df, x) diff --git a/r/R/duckdb.R b/r/R/duckdb.R index 1823ea7315d..3951362f8ed 100644 --- a/r/R/duckdb.R +++ b/r/R/duckdb.R @@ -123,11 +123,7 @@ duckdb_disconnector <- function(con, tbl_name) { #' other processes (like DuckDB). #' #' @param .data the object to be converted -#' @param as_arrow_query should the returned object be wrapped as an -#' `arrow_dplyr_query`? (logical, default: `TRUE`) -#' -#' @return a `RecordBatchReader` object, wrapped as an arrow dplyr query which -#' can be used in dplyr pipelines. +#' @return A `RecordBatchReader`. #' @export #' #' @examplesIf getFromNamespace("run_duckdb_examples", "arrow")() @@ -142,7 +138,7 @@ duckdb_disconnector <- function(con, tbl_name) { #' summarize(mean_mpg = mean(mpg, na.rm = TRUE)) %>% #' to_arrow() %>% #' collect() -to_arrow <- function(.data, as_arrow_query = TRUE) { +to_arrow <- function(.data) { # If this is an Arrow object already, return quickly since we're already Arrow if (inherits(.data, c("arrow_dplyr_query", "ArrowObject"))) { return(.data) @@ -161,9 +157,5 @@ to_arrow <- function(.data, as_arrow_query = TRUE) { # Run the query res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE) - if (as_arrow_query) { - arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res)) - } else { - duckdb::duckdb_fetch_record_batch(res) - } + duckdb::duckdb_fetch_record_batch(res) } diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 15563d62f19..e4cc4197b34 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -15,36 +15,6 @@ # specific language governing permissions and limitations # under the License. -do_exec_plan <- function(.data) { - plan <- ExecPlan$create() - final_node <- plan$Build(.data) - tab <- plan$Run(final_node) - # TODO (ARROW-14289): make the head/tail methods return RBR not Table - if (inherits(tab, "RecordBatchReader")) { - tab <- tab$read_table() - } - - # If arrange() created $temp_columns, make sure to omit them from the result - # We can't currently handle this in the ExecPlan itself because sorting - # happens in the end (SinkNode) so nothing comes after it. - if (length(final_node$sort$temp_columns) > 0) { - tab <- tab[, setdiff(names(tab), final_node$sort$temp_columns), drop = FALSE] - } - - if (ncol(tab)) { - # Apply any column metadata from the original schema, where appropriate - new_r_metadata <- get_r_metadata_from_old_schema( - tab$schema, - source_data(.data)$schema, - drop_attributes = has_aggregation(.data) - ) - if (!is.null(new_r_metadata)) { - tab$r_metadata <- new_r_metadata - } - } - tab -} - ExecPlan <- R6Class("ExecPlan", inherit = ArrowObject, public = list( @@ -220,17 +190,27 @@ ExecPlan <- R6Class("ExecPlan", # just use it to take the random slice slice_size <- node$head %||% node$tail if (!is.null(slice_size)) { - # TODO (ARROW-14289): make the head methods return RBR not Table out <- head(out, slice_size) } # Can we now tell `self$Stop()` to StopProducing? We already have # everything we need for the head (but it seems to segfault: ARROW-14329) } else if (!is.null(node$tail)) { # Reverse the row order to get back what we expect - # TODO: don't return Table, return RecordBatchReader out <- out$read_table() out <- out[rev(seq_len(nrow(out))), , drop = FALSE] + # Put back into RBR + out <- as_record_batch_reader(out) } + + # If arrange() created $temp_columns, make sure to omit them from the result + # We can't currently handle this in ExecPlan_run itself because sorting + # happens in the end (SinkNode) so nothing comes after it. + if (length(node$sort$temp_columns) > 0) { + tab <- out$read_table() + tab <- tab[, setdiff(names(tab), node$sort$temp_columns), drop = FALSE] + out <- as_record_batch_reader(tab) + } + out }, Write = function(node, ...) { diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R index 8c876c4d927..620be4c506f 100644 --- a/r/R/record-batch-reader.R +++ b/r/R/record-batch-reader.R @@ -56,6 +56,7 @@ #' #' @rdname RecordBatchReader #' @name RecordBatchReader +#' @export #' @include arrow-object.R #' @examples #' tf <- tempfile() @@ -104,6 +105,16 @@ RecordBatchReader <- R6Class("RecordBatchReader", schema = function() RecordBatchReader__schema(self) ) ) +RecordBatchReader$create <- function(..., batches = list(...), schema = NULL) { + are_batches <- map_lgl(batches, ~ inherits(., "RecordBatch")) + if (!all(are_batches)) { + stop( + "All inputs to RecordBatchReader$create must be RecordBatches", + call. = FALSE + ) + } + RecordBatchReader__from_batches(batches, schema) +} #' @export names.RecordBatchReader <- function(x) names(x$schema) @@ -208,13 +219,13 @@ as_record_batch_reader.Table <- function(x, ...) { #' @rdname as_record_batch_reader #' @export as_record_batch_reader.RecordBatch <- function(x, ...) { - RecordBatchReader__from_batches(list(x), NULL) + RecordBatchReader$create(x, schema = x$schema) } #' @rdname as_record_batch_reader #' @export as_record_batch_reader.data.frame <- function(x, ...) { - as_record_batch_reader(as_record_batch(x)) + RecordBatchReader$create(as_record_batch(x)) } #' @rdname as_record_batch_reader @@ -226,8 +237,8 @@ as_record_batch_reader.Dataset <- function(x, ...) { #' @rdname as_record_batch_reader #' @export as_record_batch_reader.arrow_dplyr_query <- function(x, ...) { - # TODO(ARROW-15271): make ExecPlan return RBR - as_record_batch_reader(collect.arrow_dplyr_query(x, as_data_frame = FALSE)) + # TODO(ARROW-16607): use ExecPlan directly when it handles metadata + as_record_batch_reader(compute.arrow_dplyr_query(x)) } #' @rdname as_record_batch_reader diff --git a/r/R/record-batch.R b/r/R/record-batch.R index 12fb570c4bb..c8579c11245 100644 --- a/r/R/record-batch.R +++ b/r/R/record-batch.R @@ -297,6 +297,12 @@ as_record_batch.Table <- function(x, ..., schema = NULL) { out } +#' @rdname as_record_batch +#' @export +as_record_batch.arrow_dplyr_query <- function(x, ...) { + as_record_batch(compute.arrow_dplyr_query(x), ...) +} + #' @rdname as_record_batch #' @export as_record_batch.data.frame <- function(x, ..., schema = NULL) { diff --git a/r/R/table.R b/r/R/table.R index 8a23f59f069..c9789b8df8e 100644 --- a/r/R/table.R +++ b/r/R/table.R @@ -139,6 +139,13 @@ Table$create <- function(..., schema = NULL) { if (all_record_batches(dots)) { return(Table__from_record_batches(dots, schema)) } + if (length(dots) == 1 && inherits(dots[[1]], c("RecordBatchReader", "RecordBatchFileReader"))) { + tab <- dots[[1]]$read_table() + if (!is.null(schema)) { + tab <- tab$cast(schema) + } + return(tab) + } # If any arrays are length 1, recycle them dots <- recycle_scalars(dots) diff --git a/r/man/as_record_batch.Rd b/r/man/as_record_batch.Rd index c8830c10717..9be4b32eb68 100644 --- a/r/man/as_record_batch.Rd +++ b/r/man/as_record_batch.Rd @@ -4,6 +4,7 @@ \alias{as_record_batch} \alias{as_record_batch.RecordBatch} \alias{as_record_batch.Table} +\alias{as_record_batch.arrow_dplyr_query} \alias{as_record_batch.data.frame} \title{Convert an object to an Arrow RecordBatch} \usage{ @@ -13,6 +14,8 @@ as_record_batch(x, ..., schema = NULL) \method{as_record_batch}{Table}(x, ..., schema = NULL) +\method{as_record_batch}{arrow_dplyr_query}(x, ...) + \method{as_record_batch}{data.frame}(x, ..., schema = NULL) } \arguments{ diff --git a/r/man/map_batches.Rd b/r/man/map_batches.Rd index 08e7b86c057..eaeab6013a6 100644 --- a/r/man/map_batches.Rd +++ b/r/man/map_batches.Rd @@ -4,27 +4,37 @@ \alias{map_batches} \title{Apply a function to a stream of RecordBatches} \usage{ -map_batches(X, FUN, ..., .data.frame = TRUE) +map_batches(X, FUN, ..., .data.frame = NULL) } \arguments{ \item{X}{A \code{Dataset} or \code{arrow_dplyr_query} object, as returned by the \code{dplyr} methods on \code{Dataset}.} \item{FUN}{A function or \code{purrr}-style lambda expression to apply to each -batch} +batch. It must return a RecordBatch or something coercible to one via +`as_record_batch()'.} \item{...}{Additional arguments passed to \code{FUN}} -\item{.data.frame}{logical: collect the resulting chunks into a single -\code{data.frame}? Default \code{TRUE}} +\item{.data.frame}{Deprecated argument, ignored} +} +\value{ +An \code{arrow_dplyr_query}. } \description{ As an alternative to calling \code{collect()} on a \code{Dataset} query, you can use this function to access the stream of \code{RecordBatch}es in the \code{Dataset}. -This lets you aggregate on each chunk and pull the intermediate results into -a \code{data.frame} for further aggregation, even if you couldn't fit the whole -\code{Dataset} result in memory. +This lets you do more complex operations in R that operate on chunks of data +without having to hold the entire Dataset in memory at once. You can include +\code{map_batches()} in a dplyr pipeline and do additional dplyr methods on the +stream of data in Arrow after it. } \details{ -This is experimental and not recommended for production use. +Note that, unlike the core dplyr methods that are implemented in the Arrow +query engine, \code{map_batches()} is not lazy: it starts evaluating on the data +when you call it, even if you send its result to another pipeline function. + +This is experimental and not recommended for production use. It is also +single-threaded and runs in R not C++, so it won't be as fast as core +Arrow methods. } diff --git a/r/man/to_arrow.Rd b/r/man/to_arrow.Rd index 4d300521011..ad534b06960 100644 --- a/r/man/to_arrow.Rd +++ b/r/man/to_arrow.Rd @@ -4,17 +4,13 @@ \alias{to_arrow} \title{Create an Arrow object from others} \usage{ -to_arrow(.data, as_arrow_query = TRUE) +to_arrow(.data) } \arguments{ \item{.data}{the object to be converted} - -\item{as_arrow_query}{should the returned object be wrapped as an -\code{arrow_dplyr_query}? (logical, default: \code{TRUE})} } \value{ -a \code{RecordBatchReader} object, wrapped as an arrow dplyr query which -can be used in dplyr pipelines. +A \code{RecordBatchReader}. } \description{ This can be used in pipelines that pass data back and forth between Arrow and diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 5810320a057..9185da93be0 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -4484,7 +4484,7 @@ BEGIN_CPP11 END_CPP11 } // recordbatchreader.cpp -std::shared_ptr RecordBatchReader__Head(const std::shared_ptr& reader, int64_t num_rows); +std::shared_ptr RecordBatchReader__Head(const std::shared_ptr& reader, int64_t num_rows); extern "C" SEXP _arrow_RecordBatchReader__Head(SEXP reader_sexp, SEXP num_rows_sexp){ BEGIN_CPP11 arrow::r::Input&>::type reader(reader_sexp); diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp index 59fc37b32f0..fb173825f3b 100644 --- a/r/src/recordbatchreader.cpp +++ b/r/src/recordbatchreader.cpp @@ -67,7 +67,7 @@ std::shared_ptr Table__from_RecordBatchReader( } // [[arrow::export]] -std::shared_ptr RecordBatchReader__Head( +std::shared_ptr RecordBatchReader__Head( const std::shared_ptr& reader, int64_t num_rows) { std::vector> batches; std::shared_ptr this_batch; @@ -77,7 +77,8 @@ std::shared_ptr RecordBatchReader__Head( batches.push_back(this_batch->Slice(0, num_rows)); num_rows -= this_batch->num_rows(); } - return ValueOrStop(arrow::Table::FromRecordBatches(reader->schema(), batches)); + return ValueOrStop( + arrow::RecordBatchReader::Make(std::move(batches), reader->schema())); } // -------- RecordBatchStreamReader diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index 5b657148a57..e7553706624 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -700,9 +700,8 @@ test_that("Dataset min_rows_per_group", { ds <- open_dataset(dst_dir) row_group_sizes <- ds %>% - select() %>% - map_batches(~ .$num_rows, .data.frame = FALSE) %>% - unlist() + map_batches(~ record_batch(nrows = .$num_rows)) %>% + pull(nrows) index <- 1 # We expect there to be 3 row groups since 11/5 = 2.2 and 11/4 = 2.75 @@ -739,9 +738,8 @@ test_that("Dataset write max rows per group", { file_path <- paste(dst_dir, written_files[[1]], sep = "/") ds <- open_dataset(file_path) row_group_sizes <- ds %>% - select() %>% - map_batches(~ .$num_rows, .data.frame = FALSE) %>% - unlist() %>% # Returns list because .data.frame is FALSE + map_batches(~ record_batch(nrows = .$num_rows)) %>% + pull(nrows) %>% sort() expect_equal(row_group_sizes, c(12, 18)) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index c61226d5161..7c208cd0908 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -625,17 +625,18 @@ test_that("map_batches", { filter(int > 5) %>% select(int, lgl) %>% map_batches(~ summarize(., min_int = min(int))) %>% - arrange(min_int), + arrange(min_int) %>% + collect(), tibble(min_int = c(6L, 101L)) ) - # $num_rows returns integer vector + # $num_rows returns integer vector, so we need to wrap it in a RecordBatch expect_equal( ds %>% filter(int > 5) %>% select(int, lgl) %>% - map_batches(~ .$num_rows, .data.frame = FALSE) %>% - unlist() %>% # Returns list because .data.frame is FALSE + map_batches(~ record_batch(nrows = .$num_rows)) %>% + pull(nrows) %>% sort(), c(5, 10) ) @@ -644,19 +645,35 @@ test_that("map_batches", { expect_equal( ds %>% map_batches(~ count(., part)) %>% - arrange(part), + arrange(part) %>% + collect(), tibble(part = c(1, 2), n = c(10, 10)) ) - # $Take returns RecordBatch, which gets binded into a tibble + # $Take returns RecordBatch expect_equal( ds %>% filter(int > 5) %>% select(int, lgl) %>% map_batches(~ .$Take(0)) %>% - arrange(int), + arrange(int) %>% + collect(), tibble(int = c(6, 101), lgl = c(TRUE, TRUE)) ) + + # Do things in R and put back into Arrow + expect_equal( + ds %>% + filter(int < 5) %>% + select(int) %>% + map_batches( + # as_mapper() can't handle %>%? + ~ mutate(as.data.frame(.), lets = letters[int]) + ) %>% + arrange(int) %>% + collect(), + tibble(int = 1:4, lets = letters[1:4]) + ) }) test_that("head/tail", { diff --git a/r/tests/testthat/test-duckdb.R b/r/tests/testthat/test-duckdb.R index 1df109cbf14..82451017a4a 100644 --- a/r/tests/testthat/test-duckdb.R +++ b/r/tests/testthat/test-duckdb.R @@ -188,7 +188,7 @@ test_that("to_arrow roundtrip, with dataset (without wrapping)", { to_duckdb() %>% select(-fct) %>% mutate(dbl_plus = dbl + 1) %>% - to_arrow(as_arrow_query = FALSE) + to_arrow() expect_r6_class(out, "RecordBatchReader") }) diff --git a/r/tests/testthat/test-record-batch-reader.R b/r/tests/testthat/test-record-batch-reader.R index 4f5cc3c8b9f..597187da459 100644 --- a/r/tests/testthat/test-record-batch-reader.R +++ b/r/tests/testthat/test-record-batch-reader.R @@ -153,8 +153,14 @@ test_that("reader head method edge cases", { reader <- RecordBatchStreamReader$create(buf) expect_error(head(reader, -1)) # Not (yet) supported - expect_equal(head(reader, 0), Table$create(x = integer(0), y = character(0))) - expect_equal(head(reader, 100), Table$create(batch, batch)) + expect_equal( + Table$create(head(reader, 0)), + Table$create(x = integer(0), y = character(0)) + ) + expect_equal( + Table$create(head(reader, 100)), + Table$create(batch, batch) + ) }) test_that("RBR methods", {