Skip to content

Commit

Permalink
feat(r): Add bindings to IPC writer (#608)
Browse files Browse the repository at this point in the history
This PR adds a basic level of support for IPC writing in the R package.
This is basically a thin wrapper around `ArrowIpcWriterWriteStream()`
and could be more feature-rich like the Python version (that allows
schemas and batches to be written individually).

I also added a bit of code to handle interrupts (which should catch
interrupts on read and write and wasn't handled before).

``` r
library(nanoarrow)
tf <- tempfile()

nycflights13::flights |> write_nanoarrow(tf)
(df <- tf |> read_nanoarrow() |> tibble::as_tibble())
#> # A tibble: 336,776 × 19
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      517            515         2      830            819
#>  2  2013     1     1      533            529         4      850            830
#>  3  2013     1     1      542            540         2      923            850
#>  4  2013     1     1      544            545        -1     1004           1022
#>  5  2013     1     1      554            600        -6      812            837
#>  6  2013     1     1      554            558        -4      740            728
#>  7  2013     1     1      555            600        -5      913            854
#>  8  2013     1     1      557            600        -3      709            723
#>  9  2013     1     1      557            600        -3      838            846
#> 10  2013     1     1      558            600        -2      753            745
#> # ℹ 336,766 more rows
#> # ℹ 11 more variables: arr_delay <dbl>, carrier <chr>, flight <int>,
#> #   tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>,
#> #   hour <dbl>, minute <dbl>, time_hour <dttm>
identical(df, nycflights13::flights)
#> [1] TRUE
```

<sup>Created on 2024-09-14 with [reprex
v2.1.1](https://reprex.tidyverse.org)</sup>
  • Loading branch information
paleolimbot authored Sep 17, 2024
1 parent e18bd38 commit 396d851
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 21 deletions.
2 changes: 1 addition & 1 deletion r/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Description: Provides an 'R' interface to the 'nanoarrow' 'C' library and the
License: Apache License (>= 2)
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.3
RoxygenNote: 7.3.2
URL: https://arrow.apache.org/nanoarrow/latest/r/, https://github.com/apache/arrow-nanoarrow
BugReports: https://github.com/apache/arrow-nanoarrow/issues
Suggests:
Expand Down
3 changes: 3 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ S3method(str,nanoarrow_array_stream)
S3method(str,nanoarrow_buffer)
S3method(str,nanoarrow_schema)
S3method(str,nanoarrow_vctr)
S3method(write_nanoarrow,character)
S3method(write_nanoarrow,connection)
export(array_stream_set_finalizer)
export(as_nanoarrow_array)
export(as_nanoarrow_array_extension)
Expand Down Expand Up @@ -210,6 +212,7 @@ export(read_nanoarrow)
export(register_nanoarrow_extension)
export(resolve_nanoarrow_extension)
export(unregister_nanoarrow_extension)
export(write_nanoarrow)
importFrom(utils,getFromNamespace)
importFrom(utils,str)
useDynLib(nanoarrow, .registration = TRUE)
72 changes: 66 additions & 6 deletions r/R/ipc.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
# specific language governing permissions and limitations
# under the License.

#' Read serialized streams of Arrow data
#' Read/write serialized streams of Arrow data
#'
#' Reads connections, file paths, URLs, or raw vectors of serialized Arrow
#' data. Arrow documentation typically refers to this format as "Arrow IPC",
#' since its origin was as a means to transmit tables between processes
#' Reads/writes connections, file paths, URLs, or raw vectors from/to serialized
#' Arrow data. Arrow documentation typically refers to this format as "Arrow
#' IPC", since its origin was as a means to transmit tables between processes
#' (e.g., multiple R sessions). This format can also be written to and read
#' from files or URLs and is essentially a high performance equivalent of
#' a CSV file that does a better job maintaining types.
#'
#' The nanoarrow package does not currently have the ability to write serialized
#' IPC data: use [arrow::write_ipc_stream()] to write data from R, or use
#' The nanoarrow package implements an IPC writer; however, you can also
#' use [arrow::write_ipc_stream()] to write data from R, or use
#' the equivalent writer from another Arrow implementation in Python, C++,
#' Rust, JavaScript, Julia, C#, and beyond.
#'
Expand All @@ -35,6 +35,8 @@
#' @param x A `raw()` vector, connection, or file path from which to read
#' binary data. Common extensions indicating compression (.gz, .bz2, .zip)
#' are automatically uncompressed.
#' @param data An object to write as an Arrow IPC stream, converted using
#' [as_nanoarrow_array_stream()]. Notably, this includes a [data.frame()].
#' @param lazy By default, `read_nanoarrow()` will read and discard a copy of
#' the reader's schema to ensure that invalid streams are discovered as
#' soon as possible. Use `lazy = TRUE` to defer this check until the reader
Expand Down Expand Up @@ -107,6 +109,42 @@ read_nanoarrow.connection <- function(x, ..., lazy = FALSE) {
check_stream_if_requested(reader, lazy)
}

#' @rdname read_nanoarrow
#' @export
write_nanoarrow <- function(data, x, ...) {
UseMethod("write_nanoarrow", x)
}

#' @export
write_nanoarrow.connection <- function(data, x, ...) {
if (!isOpen(x)) {
open(x, "wb")
on.exit(close(x))
}

writer <- .Call(nanoarrow_c_ipc_writer_connection, x)
stream <- as_nanoarrow_array_stream(data)
on.exit(nanoarrow_pointer_release(stream), add = TRUE)

.Call(nanoarrow_c_ipc_writer_write_stream, writer, stream)
invisible(data)
}

#' @export
write_nanoarrow.character <- function(data, x, ...) {
if (length(x) != 1) {
stop(sprintf("Can't interpret character(%d) as file path", length(x)))
}

con_type <- guess_connection_type(x)
if (con_type == "unz") {
stop("zip compression not supported for write_nanoarrow()")
}

con <- do.call(con_type, list(x))
write_nanoarrow(data, con)
}

#' @rdname read_nanoarrow
#' @export
example_ipc_stream <- function() {
Expand Down Expand Up @@ -205,3 +243,25 @@ guess_zip_filename <- function(x) {

files
}

# The C-level R_tryCatch() does not provide for handling interrupts (or
# I couldn't figure out how to make it work), so instead we provide wrappers
# around readBin() and writeBin() that convert interrupt conditions to errors
# (which the C code does know how to handle).
read_bin_wrapper <- function(con, what, n) {
withCallingHandlers(
readBin(con, what, n),
interrupt = function(e) {
stop("user interrupt")
}
)
}

write_bin_wrapper <- function(object, con) {
withCallingHandlers(
writeBin(object, con),
interrupt = function(e) {
stop("user interrupt")
}
)
}
18 changes: 12 additions & 6 deletions r/man/read_nanoarrow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions r/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ extern SEXP nanoarrow_c_infer_ptype(SEXP schema_xptr);
extern SEXP nanoarrow_c_convert_array(SEXP array_xptr, SEXP ptype_sexp);
extern SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr);
extern SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con);
extern SEXP nanoarrow_c_ipc_writer_connection(SEXP con);
extern SEXP nanoarrow_c_ipc_writer_write_stream(SEXP writer_xptr, SEXP array_stream_xptr);
extern SEXP nanoarrow_c_allocate_schema(void);
extern SEXP nanoarrow_c_allocate_array(void);
extern SEXP nanoarrow_c_allocate_array_stream(void);
Expand Down Expand Up @@ -136,6 +138,9 @@ static const R_CallMethodDef CallEntries[] = {
1},
{"nanoarrow_c_ipc_array_reader_connection",
(DL_FUNC)&nanoarrow_c_ipc_array_reader_connection, 1},
{"nanoarrow_c_ipc_writer_connection", (DL_FUNC)&nanoarrow_c_ipc_writer_connection, 1},
{"nanoarrow_c_ipc_writer_write_stream", (DL_FUNC)&nanoarrow_c_ipc_writer_write_stream,
2},
{"nanoarrow_c_allocate_schema", (DL_FUNC)&nanoarrow_c_allocate_schema, 0},
{"nanoarrow_c_allocate_array", (DL_FUNC)&nanoarrow_c_allocate_array, 0},
{"nanoarrow_c_allocate_array_stream", (DL_FUNC)&nanoarrow_c_allocate_array_stream, 0},
Expand Down
149 changes: 145 additions & 4 deletions r/src/ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <stdint.h>
#define R_NO_REMAP
#include <R.h>
#include <Rinternals.h>
Expand Down Expand Up @@ -48,6 +49,50 @@ static SEXP input_stream_owning_xptr(void) {
return input_stream_xptr;
}

static void finalize_output_stream_xptr(SEXP output_stream_xptr) {
struct ArrowIpcOutputStream* output_stream =
(struct ArrowIpcOutputStream*)R_ExternalPtrAddr(output_stream_xptr);
if (output_stream != NULL && output_stream->release != NULL) {
output_stream->release(output_stream);
}

if (output_stream != NULL) {
ArrowFree(output_stream);
}
}

static SEXP output_stream_owning_xptr(void) {
struct ArrowIpcOutputStream* output_stream =
(struct ArrowIpcOutputStream*)ArrowMalloc(sizeof(struct ArrowIpcOutputStream));
output_stream->release = NULL;
SEXP output_stream_xptr =
PROTECT(R_MakeExternalPtr(output_stream, R_NilValue, R_NilValue));
R_RegisterCFinalizer(output_stream_xptr, &finalize_output_stream_xptr);
UNPROTECT(1);
return output_stream_xptr;
}

static void finalize_writer_xptr(SEXP writer_xptr) {
struct ArrowIpcWriter* writer = (struct ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr);
if (writer != NULL && writer->private_data != NULL) {
ArrowIpcWriterReset(writer);
}

if (writer != NULL) {
ArrowFree(writer);
}
}

static SEXP writer_owning_xptr(void) {
struct ArrowIpcWriter* writer =
(struct ArrowIpcWriter*)ArrowMalloc(sizeof(struct ArrowIpcWriter));
writer->private_data = NULL;
SEXP writer_xptr = PROTECT(R_MakeExternalPtr(writer, R_NilValue, R_NilValue));
R_RegisterCFinalizer(writer_xptr, &finalize_writer_xptr);
UNPROTECT(1);
return writer_xptr;
}

SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr) {
struct ArrowBuffer* buffer = buffer_from_xptr(buffer_xptr);

Expand Down Expand Up @@ -82,7 +127,7 @@ struct ConnectionInputStreamHandler {
int return_code;
};

static SEXP handle_readbin_error(SEXP cond, void* hdata) {
static SEXP handle_readbin_writebin_error(SEXP cond, void* hdata) {
struct ConnectionInputStreamHandler* data = (struct ConnectionInputStreamHandler*)hdata;

SEXP fun = PROTECT(Rf_install("conditionMessage"));
Expand All @@ -103,7 +148,7 @@ static SEXP call_readbin(void* hdata) {
SEXP n = PROTECT(Rf_ScalarReal((double)data->buf_size_bytes));
SEXP call = PROTECT(Rf_lang4(nanoarrow_sym_readbin, data->con, nanoarrow_ptype_raw, n));

SEXP result = PROTECT(Rf_eval(call, R_BaseEnv));
SEXP result = PROTECT(Rf_eval(call, nanoarrow_ns_pkg));
R_xlen_t bytes_read = Rf_xlength(result);
memcpy(data->buf, RAW(result), bytes_read);
*(data->size_read_out) = bytes_read;
Expand All @@ -112,6 +157,36 @@ static SEXP call_readbin(void* hdata) {
return R_NilValue;
}

static SEXP call_writebin(void* hdata) {
struct ConnectionInputStreamHandler* data = (struct ConnectionInputStreamHandler*)hdata;

// Write 16MB chunks. This a balance between being small enough not to
// copy too much of the source unnecessarily and big enough to avoid
// unnecessary R evaluation overhead.
int64_t chunk_buffer_size = 16777216;
SEXP chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, chunk_buffer_size));
SEXP call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer, data->con));
while (data->buf_size_bytes > chunk_buffer_size) {
memcpy(RAW(chunk_buffer), data->buf, chunk_buffer_size);
Rf_eval(call, nanoarrow_ns_pkg);
data->buf_size_bytes -= chunk_buffer_size;
data->buf += chunk_buffer_size;
}

UNPROTECT(2);

// Write remaining bytes
if (data->buf_size_bytes > 0) {
chunk_buffer = PROTECT(Rf_allocVector(RAWSXP, data->buf_size_bytes));
call = PROTECT(Rf_lang3(nanoarrow_sym_writebin, chunk_buffer, data->con));
memcpy(RAW(chunk_buffer), data->buf, data->buf_size_bytes);
Rf_eval(call, nanoarrow_ns_pkg);
UNPROTECT(2);
}

return R_NilValue;
}

static ArrowErrorCode read_con_input_stream(struct ArrowIpcInputStream* stream,
uint8_t* buf, int64_t buf_size_bytes,
int64_t* size_read_out,
Expand All @@ -129,14 +204,43 @@ static ArrowErrorCode read_con_input_stream(struct ArrowIpcInputStream* stream,
data.error = error;
data.return_code = NANOARROW_OK;

R_tryCatchError(&call_readbin, &data, &handle_readbin_error, &data);
R_tryCatchError(&call_readbin, &data, &handle_readbin_writebin_error, &data);
return data.return_code;
}

static ArrowErrorCode write_con_output_stream(struct ArrowIpcOutputStream* stream,
const void* buf, int64_t buf_size_bytes,
int64_t* size_write_out,
struct ArrowError* error) {
if (!nanoarrow_is_main_thread()) {
ArrowErrorSet(error, "Can't read from R connection on a non-R thread");
return EIO;
}

struct ConnectionInputStreamHandler data;
data.con = (SEXP)stream->private_data;
data.buf = (void*)buf;
data.buf_size_bytes = buf_size_bytes;
data.size_read_out = NULL;
data.error = error;
data.return_code = NANOARROW_OK;

R_tryCatchError(&call_writebin, &data, &handle_readbin_writebin_error, &data);

// This implementation always blocks until all bytes have been written
*size_write_out = buf_size_bytes;

return data.return_code;
}

static void release_con_input_stream(struct ArrowIpcInputStream* stream) {
nanoarrow_release_sexp((SEXP)stream->private_data);
}

static void release_con_output_stream(struct ArrowIpcOutputStream* stream) {
nanoarrow_release_sexp((SEXP)stream->private_data);
}

SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con) {
SEXP array_stream_xptr = PROTECT(nanoarrow_array_stream_owning_xptr());
struct ArrowArrayStream* array_stream =
Expand All @@ -153,9 +257,46 @@ SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con) {

int code = ArrowIpcArrayStreamReaderInit(array_stream, input_stream, NULL);
if (code != NANOARROW_OK) {
Rf_error("ArrowIpcArrayStreamReaderInit() failed");
Rf_error("ArrowIpcArrayStreamReaderInit() failed with errno %d", code);
}

UNPROTECT(2);
return array_stream_xptr;
}

SEXP nanoarrow_c_ipc_writer_connection(SEXP con) {
SEXP output_stream_xptr = PROTECT(output_stream_owning_xptr());
struct ArrowIpcOutputStream* output_stream =
(struct ArrowIpcOutputStream*)R_ExternalPtrAddr(output_stream_xptr);

output_stream->write = &write_con_output_stream;
output_stream->release = &release_con_output_stream;
output_stream->private_data = (SEXP)con;
nanoarrow_preserve_sexp(con);

SEXP writer_xptr = PROTECT(writer_owning_xptr());
struct ArrowIpcWriter* writer = (struct ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr);

int code = ArrowIpcWriterInit(writer, output_stream);
if (code != NANOARROW_OK) {
Rf_error("ArrowIpcWriterInit() failed with errno %d", code);
}

UNPROTECT(2);
return writer_xptr;
}

SEXP nanoarrow_c_ipc_writer_write_stream(SEXP writer_xptr, SEXP array_stream_xptr) {
struct ArrowIpcWriter* writer = (struct ArrowIpcWriter*)R_ExternalPtrAddr(writer_xptr);
struct ArrowArrayStream* array_stream =
nanoarrow_array_stream_from_xptr(array_stream_xptr);

struct ArrowError error;
ArrowErrorInit(&error);
int code = ArrowIpcWriterWriteArrayStream(writer, array_stream, &error);
if (code != NANOARROW_OK) {
Rf_error("ArrowIpcWriterWriteArrayStream() failed: %s", error.message);
}

return R_NilValue;
}
Loading

0 comments on commit 396d851

Please sign in to comment.