Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
2eb3c9b
add test file for general compute
paleolimbot Jun 17, 2022
80293f7
scalar function creator
paleolimbot Jun 17, 2022
29d02d8
implement registration in R
paleolimbot Jun 17, 2022
190d059
sketch C++ UDF behaviour
paleolimbot Jun 17, 2022
5e3f682
working R execution
paleolimbot Jun 17, 2022
e129471
Update r/src/compute.cpp
paleolimbot Jun 20, 2022
83ad7ad
Update r/src/compute.cpp
paleolimbot Jun 20, 2022
94c0b2f
check Array argument
paleolimbot Jun 20, 2022
9e1a362
don't force arguments to Array
paleolimbot Jun 20, 2022
ddc0d46
remove unused code
paleolimbot Jun 20, 2022
fadf258
Update r/src/compute.cpp
paleolimbot Jun 21, 2022
2eb48ae
better names for variables
paleolimbot Jun 21, 2022
c29fc00
handle more cases on execution
paleolimbot Jun 21, 2022
b1c8cbf
use Resolver as an R function
paleolimbot Jun 21, 2022
cf98635
add a more user-friendly scalar function wrapper
paleolimbot Jun 21, 2022
c171da6
better fun resolution
paleolimbot Jun 21, 2022
e129028
add kernel state class
paleolimbot Jun 21, 2022
4631cb9
use ScalarKernel.data to keep function references
paleolimbot Jun 21, 2022
5b82d79
documentation for functions
paleolimbot Jun 21, 2022
99f7225
add the output type to the kernel context
paleolimbot Jun 21, 2022
80e5683
touch up
paleolimbot Jun 21, 2022
1e343a2
add reference page
paleolimbot Jun 21, 2022
df6ea0c
don't create lists in growables
paleolimbot Jun 22, 2022
36feaac
separate ExecPlan_prepare and ExecPlan_run
paleolimbot Jun 22, 2022
32e8d83
push as much exec plan execution into C++ as is possibe
paleolimbot Jun 22, 2022
8ff947b
test UDF in dplyr query
paleolimbot Jun 22, 2022
87402fc
clang-format
paleolimbot Jun 22, 2022
6d60921
maybe fixed sign compare error
paleolimbot Jun 22, 2022
e338f7d
limit scope on test
paleolimbot Jun 22, 2022
e0dd5c0
try to fix lintr errors
paleolimbot Jun 22, 2022
cdecb55
see if this example is the problem on 32-bit windows
paleolimbot Jun 22, 2022
65a5dc0
maybe fix on old windows
paleolimbot Jun 22, 2022
f5ec713
add larger-scale dataset test whilst executing a user-defined function
paleolimbot Jul 6, 2022
bb38274
better variable names in tests
paleolimbot Jul 6, 2022
565c5b5
base_scalar_function -> advanced_scalar_function
paleolimbot Jul 6, 2022
b96469b
get write_dataset() to work with user-defined function
paleolimbot Jul 6, 2022
0c139d1
better names for arrow_scalar_function() test
paleolimbot Jul 6, 2022
52880b1
change argument order for scalar function constructor
paleolimbot Jul 7, 2022
e8856b7
register_scalar_function -> register_user_defined_function
paleolimbot Jul 7, 2022
2665fdf
better argument names and inline comments
paleolimbot Jul 7, 2022
010ccf6
fix pkgdown reference
paleolimbot Jul 7, 2022
f732505
remove unused doc entry
paleolimbot Jul 7, 2022
4c01654
simplify detection of when we can and can't use SafeCallIntoR()
paleolimbot Jul 7, 2022
21a932a
abstract and document RunWithCapturedR usage
paleolimbot Jul 7, 2022
0c1b8cf
better failure mode for calling user-defined functions when we can't …
paleolimbot Jul 7, 2022
017f681
fix + clarify registration
paleolimbot Jul 7, 2022
85519a2
don't namespace rlang::
paleolimbot Jul 7, 2022
9f251fc
clang-format
paleolimbot Jul 7, 2022
ebc0b84
constrain Arity specification to a fixed number of arguments (per fun…
paleolimbot Jul 7, 2022
ed735e1
Update r/src/compute.cpp
paleolimbot Jul 8, 2022
8877c12
Update r/src/compute.cpp
paleolimbot Jul 8, 2022
a89ce07
Update r/src/compute.cpp
paleolimbot Jul 8, 2022
f687451
clang-format
paleolimbot Jul 8, 2022
2e9e261
more readable path when RunWithCapturedR does not return a Result
paleolimbot Jul 8, 2022
514d91e
fix the void version of RunWithCapturedR
paleolimbot Jul 8, 2022
58c8573
fix kernel signature assignment
paleolimbot Jul 8, 2022
b4154af
fix for updated master
paleolimbot Jul 8, 2022
1ed6d25
inline some short variable definitions
paleolimbot Jul 11, 2022
88bf4d2
documentation updates
paleolimbot Jul 11, 2022
6f3d601
see if the lack of error on Windows is because it actually works
paleolimbot Jul 11, 2022
031ec64
test adding multiple kernels at once
paleolimbot Jul 11, 2022
0652ae0
cleaner handling of number of arguments in user-provided kernels
paleolimbot Jul 11, 2022
49261d6
improvements for readability and performance in safe-call-into-r.h
paleolimbot Jul 11, 2022
c1207eb
revert change to executor checking
paleolimbot Jul 11, 2022
72d650d
don't automatically cast output types
paleolimbot Jul 11, 2022
a1f8b53
document return value of advanced_fun
paleolimbot Jul 11, 2022
4acaa61
revert static variable change
paleolimbot Jul 11, 2022
7ccb23b
keep old error behaviour
paleolimbot Jul 12, 2022
6ff4fb2
fix one more safe call into R change
paleolimbot Jul 12, 2022
83aa148
unify test skipping based on whether or not we can runwithcapturedr a…
paleolimbot Jul 12, 2022
8a8955f
more skips aligned with run with captured R usage
paleolimbot Jul 12, 2022
0d3520a
nix the advanced interface
paleolimbot Jul 14, 2022
4ac9ec5
document auto_convert
paleolimbot Jul 14, 2022
ba34d1a
fix one more doc link
paleolimbot Jul 14, 2022
dfbdbc2
fix link in documentation
paleolimbot Jul 14, 2022
7fd6a77
back to the Python interface
paleolimbot Jul 15, 2022
b07e736
better example, fix pkgdown reference
paleolimbot Jul 15, 2022
c79aca5
fix doc
paleolimbot Jul 15, 2022
8d18754
maybe fix doc again
paleolimbot Jul 15, 2022
8d33b07
more doc fixes
paleolimbot Jul 15, 2022
abd938a
adapt for updated register_binding()
paleolimbot Jul 16, 2022
86c1c7e
improve comments in compute.R
paleolimbot Jul 20, 2022
652175f
maybe fix linter error
paleolimbot Jul 20, 2022
12b9721
better names for in_type/out_type sanitizers
paleolimbot Jul 20, 2022
259eed9
make sure an exec plan with head() works
paleolimbot Jul 20, 2022
1f8b248
clarify the `context` argument
paleolimbot Jul 21, 2022
510221f
don't allow rlang-style lambdas quite yet
paleolimbot Jul 21, 2022
e906632
check formals of fun
paleolimbot Jul 21, 2022
1805836
don't use glue::glue
paleolimbot Jul 21, 2022
aa9165f
Update r/tests/testthat/test-compute.R
paleolimbot Jul 22, 2022
7952710
revert very bad eager evaluation!
paleolimbot Jul 22, 2022
e31f2b1
fix test for plan with head()
paleolimbot Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ S3method(as_arrow_array,data.frame)
S3method(as_arrow_array,default)
S3method(as_arrow_array,pyarrow.lib.Array)
S3method(as_arrow_table,RecordBatch)
S3method(as_arrow_table,RecordBatchReader)
S3method(as_arrow_table,Table)
S3method(as_arrow_table,arrow_dplyr_query)
S3method(as_arrow_table,data.frame)
S3method(as_arrow_table,default)
S3method(as_arrow_table,pyarrow.lib.RecordBatch)
Expand Down Expand Up @@ -343,6 +345,7 @@ export(read_schema)
export(read_tsv_arrow)
export(record_batch)
export(register_extension_type)
export(register_scalar_function)
export(reregister_extension_type)
export(s3_bucket)
export(schema)
Expand Down
20 changes: 16 additions & 4 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

176 changes: 176 additions & 0 deletions r/R/compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,179 @@ cast_options <- function(safe = TRUE, ...) {
)
modifyList(opts, list(...))
}

#' Register user-defined functions
#'
#' These functions support calling R code from query engine execution
#' (i.e., a [dplyr::mutate()] or [dplyr::filter()] on a [Table] or [Dataset]).
#' Use [register_scalar_function()] attach Arrow input and output types to an
#' R function and make it available for use in the dplyr interface and/or
#' [call_function()]. Scalar functions are currently the only type of
#' user-defined function supported. In Arrow, scalar functions must be
#' stateless and return output with the same shape (i.e., the same number
#' of rows) as the input.
#'
#' @param name The function name to be used in the dplyr bindings
#' @param in_type A [DataType] of the input type or a [schema()]
#' for functions with more than one argument. This signature will be used
#' to determine if this function is appropriate for a given set of arguments.
#' If this function is appropriate for more than one signature, pass a
#' `list()` of the above.
#' @param out_type A [DataType] of the output type or a function accepting
#' a single argument (`types`), which is a `list()` of [DataType]s. If a
#' function it must return a [DataType].
#' @param fun An R function or rlang-style lambda expression. The function
#' will be called with a first argument `context` which is a `list()`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This first argument context feels like a real 🦶 🔫 . A few questions:

  • Does it need to be first?
  • Does it need to be called context?
  • What does failure look like if I forget to include context as an arg to my function? (I'm guessing it's not pretty.) Can we detect up front if someone has forgotten to put context in the function? Something like checking that length(formals(fun)) == length(as_schema(in_type)) + 1 and raise a useful error message if the check fails?
  • You explain what it contains, but what do I do with it? Is there something I would do with batch_size or output_type?
  • You say above that functions need to be stateless, but what happens if I assign something into context in my function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A previous version of this PR didn't require the context argument when what was the equivalent of auto_convert was TRUE, but the comment was raised of "why two APIs" (and I agree...one wrapper function scheme is easier to remember).

In its current form, the context argument provides the information needed for auto_convert to do its magic. When auto_convert is TRUE, you could also use it to do something like runif(n = context$batch_size). The python version also provides the memory pool here but we don't provide a way to use the memory pool for constructing arrays, so I didn't add it to the context object.

Because it's a list(), assignments won't have any effect outside fun. A future version may be an environment to avoid the extra unwind protects needed to allocate a new list for each call (but could be one with an overridden [[<- to prevent modification).

I added some text to the documentation for fun and disallowed lambdas for now, since a potential future workaround could be to not pass the context argument for an rlang/purrr style lambda (e.g., ~.x + .y would be the equivalent of function(context, x, y) x + y). I hesitate to add too much convenience functionality in this PR since it's already rather unwieldy.

I added a length(formals(fun)) check...you're right that the error message was awful.

#' with elements `batch_size` (the expected length of the output) and
#' `output_type` (the required [DataType] of the output) that may be used
#' to ensure that the output has the correct type and length. Subsequent
#' arguments are passed by position as specified by `in_types`. If
#' `auto_convert` is `TRUE`, subsequent arguments are converted to
#' R vectors before being passed to `fun` and the output is automatically
#' constructed with the expected output type via [as_arrow_array()].
#' @param auto_convert Use `TRUE` to convert inputs before passing to `fun`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should TRUE be default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I envision it being a lot more common to use auto_convert = TRUE and went back and forth on the default value a few times. I went with this because (1) it's what the Python bindings do and (2) forcing a user to "opt-in" to the auto-convert behaviour at least clues them in that there's something magical going on, even if they don't understand exactly what it is. I don't really have strong feelings about this, I guess FALSE just seemed like a safer default.

#' and construct an Array of the correct type from the output. Use this
#' option to write functions of R objects as opposed to functions of
#' Arrow R6 objects.
#'
#' @return `NULL`, invisibly
#' @export
#'
#' @examplesIf arrow_with_dataset()
#' library(dplyr, warn.conflicts = FALSE)
#'
#' some_model <- lm(mpg ~ disp + cyl, data = mtcars)
#' register_scalar_function(
#' "mtcars_predict_mpg",
#' function(context, disp, cyl) {
#' predict(some_model, newdata = data.frame(disp, cyl))
#' },
#' in_type = schema(disp = float64(), cyl = float64()),
#' out_type = float64(),
#' auto_convert = TRUE
#' )
#'
#' as_arrow_table(mtcars) %>%
#' transmute(mpg, mpg_predicted = mtcars_predict_mpg(disp, cyl)) %>%
#' collect() %>%
#' head()
#'
register_scalar_function <- function(name, fun, in_type, out_type,
auto_convert = FALSE) {
assert_that(is.string(name))

scalar_function <- arrow_scalar_function(
fun,
in_type,
out_type,
auto_convert = auto_convert
)

# register with Arrow C++ function registry (enables its use in
# call_function() and Expression$create())
RegisterScalarUDF(name, scalar_function)

# register with dplyr binding (enables its use in mutate(), filter(), etc.)
register_binding(
name,
function(...) build_expr(name, ...),
update_cache = TRUE
)

invisible(NULL)
}

arrow_scalar_function <- function(fun, in_type, out_type, auto_convert = FALSE) {
assert_that(is.function(fun))

# Create a small wrapper function that is easier to call from C++.
# TODO(ARROW-17148): This wrapper could be implemented in C/C++ to
# reduce evaluation overhead and generate prettier backtraces when
# errors occur (probably using a similar approach to purrr).
if (auto_convert) {
wrapper_fun <- function(context, args) {
args <- lapply(args, as.vector)
result <- do.call(fun, c(list(context), args))
as_arrow_array(result, type = context$output_type)
}
} else {
wrapper_fun <- function(context, args) {
do.call(fun, c(list(context), args))
}
}

# in_type can be a list() if registering multiple kernels at once
if (is.list(in_type)) {
in_type <- lapply(in_type, in_type_as_schema)
} else {
in_type <- list(in_type_as_schema(in_type))
}

# out_type can be a list() if registering multiple kernels at once
if (is.list(out_type)) {
out_type <- lapply(out_type, out_type_as_function)
} else {
out_type <- list(out_type_as_function(out_type))
}

# recycle out_type (which is frequently length 1 even if multiple kernels
# are being registered at once)
out_type <- rep_len(out_type, length(in_type))

# check n_kernels and number of args in fun
n_kernels <- length(in_type)
if (n_kernels == 0) {
abort("Can't register user-defined scalar function with 0 kernels")
}

expected_n_args <- in_type[[1]]$num_fields + 1L
fun_formals_have_dots <- any(names(formals(fun)) == "...")
if (!fun_formals_have_dots && length(formals(fun)) != expected_n_args) {
abort(
sprintf(
paste0(
"Expected `fun` to accept %d argument(s)\n",
"but found a function that acccepts %d argument(s)\n",
"Did you forget to include `context` as the first argument?"
),
expected_n_args,
length(formals(fun))
)
)
}

structure(
list(
wrapper_fun = wrapper_fun,
in_type = in_type,
out_type = out_type
),
class = "arrow_scalar_function"
)
}

# This function sanitizes the in_type argument for arrow_scalar_function(),
# which can be a data type (e.g., int32()), a field for a unary function
# or a schema() for functions accepting more than one argument. C++ expects
# a schema().
in_type_as_schema <- function(x) {
if (inherits(x, "Field")) {
schema(x)
} else if (inherits(x, "DataType")) {
schema(field("", x))
} else {
as_schema(x)
}
}

# This function sanitizes the out_type argument for arrow_scalar_function(),
# which can be a data type (e.g., int32()) or a function of the input types.
# C++ currently expects a function.
out_type_as_function <- function(x) {
if (is.function(x)) {
x
} else {
x <- as_data_type(x)
function(types) x
}
}
2 changes: 1 addition & 1 deletion r/R/dplyr-collect.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) {
tryCatch(
out <- as_record_batch_reader(x)$read_table(),
out <- as_arrow_table(x),
# n = 4 because we want the error to show up as being from collect()
# and not handle_csv_read_error()
error = function(e, call = caller_env(n = 4)) {
Expand Down
47 changes: 40 additions & 7 deletions r/R/dplyr-funcs.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,28 @@ NULL
#' - `fun`: string function name
#' - `data`: `Expression` (these are all currently a single field)
#' - `options`: list of function options, as passed to call_function
#' @param update_cache Update .cache$functions at the time of registration.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newbie question, but what is this cache exactly? ".cache$functions" isn't exactly informative...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an excellent question...as far as I know nobody remembers why this cache exists I just didn't want to poke that bear here (I'll add a note about that).

#' the default is FALSE because the majority of usage is to register
#' bindings at package load, after which we create the cache once. The
#' reason why .cache$functions is needed in addition to nse_funcs for
#' non-aggregate functions could be revisited...it is currently used
#' as the data mask in mutate, filter, and aggregate (but not
#' summarise) because the data mask has to be a list.
#' @param registry An environment in which the functions should be
#' assigned.
#'
#' @return The previously registered binding or `NULL` if no previously
#' registered function existed.
#' @keywords internal
#'
register_binding <- function(fun_name, fun, registry = nse_funcs) {
register_binding <- function(fun_name, fun, registry = nse_funcs,
update_cache = FALSE) {
unqualified_name <- sub("^.*?:{+}", "", fun_name)

previous_fun <- registry[[unqualified_name]]

# if the unqualified name exists in the registry, warn
if (!is.null(fun) && !is.null(previous_fun)) {
if (!is.null(previous_fun)) {
warn(
paste0(
"A \"",
Expand All @@ -73,11 +81,36 @@ register_binding <- function(fun_name, fun, registry = nse_funcs) {
}

# register both as `pkg::fun` and as `fun` if `qualified_name` is prefixed
if (grepl("::", fun_name)) {
registry[[unqualified_name]] <- fun
registry[[fun_name]] <- fun
} else {
registry[[unqualified_name]] <- fun
# unqualified_name and fun_name will be the same if not prefixed
registry[[unqualified_name]] <- fun
registry[[fun_name]] <- fun

if (update_cache) {
fun_cache <- .cache$functions
fun_cache[[unqualified_name]] <- fun
fun_cache[[fun_name]] <- fun
.cache$functions <- fun_cache
}

invisible(previous_fun)
}

unregister_binding <- function(fun_name, registry = nse_funcs,
update_cache = FALSE) {
unqualified_name <- sub("^.*?:{+}", "", fun_name)
previous_fun <- registry[[unqualified_name]]

rm(
list = unique(c(fun_name, unqualified_name)),
envir = registry,
inherits = FALSE
)

if (update_cache) {
fun_cache <- .cache$functions
fun_cache[[unqualified_name]] <- NULL
fun_cache[[fun_name]] <- NULL
.cache$functions <- fun_cache
}

invisible(previous_fun)
Expand Down
4 changes: 2 additions & 2 deletions r/R/feather.R
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ FeatherReader <- R6Class("FeatherReader",
inherit = ArrowObject,
public = list(
Read = function(columns) {
ipc___feather___Reader__Read(self, columns, on_old_windows())
ipc___feather___Reader__Read(self, columns)
},
print = function(...) {
cat("FeatherReader:\n")
Expand All @@ -211,5 +211,5 @@ names.FeatherReader <- function(x) x$column_names

FeatherReader$create <- function(file) {
assert_is(file, "RandomAccessFile")
ipc___feather___Reader__Open(file, on_old_windows())
ipc___feather___Reader__Open(file)
}
Loading