Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversion from (nano)arrow ? #64

Closed
eddelbuettel opened this issue Mar 5, 2023 · 9 comments
Closed

Conversion from (nano)arrow ? #64

eddelbuettel opened this issue Mar 5, 2023 · 9 comments

Comments

@eddelbuettel
Copy link
Contributor

The Arrow C interface is bidirectional. Having the the export to (nano)arrow Table and RecordBatch is useful. It would also be nice to vbe able to go the other way and take a Arrow schema and array_data pointer pair to instantiate a polars instance.

Are there any plans for this? (If there are any I apologize. I looked and only found export but not import.)

@eddelbuettel eddelbuettel changed the title Conversion _from_ (nano)arrow ? Conversion from (nano)arrow ? Mar 5, 2023
@sorhawell
Copy link
Collaborator

sorhawell commented Mar 6, 2023

Hi Dirk, thank you for asking :) I think that would correspond to the py-polars DataFrame.from_arrow() (and likewise for LazyFrame) that I need to implement for r-polars. I will try to do so right now.

@sorhawell
Copy link
Collaborator

uhh I hit a steep bump on my learning path on pointer magic and the Arrow protocol. But now it works. It is 20 times faster than current full-copy rpolars pl$DataFrame() The implementation is very analogues to py_polars it calls arrow:::ExportArray for every chunk of every column. If chunks are very small, it might slow down a bit.

right now import speed is around 21ms for 600k row x 19 columns.

> library(rpolars)
> library(arrow)
> library(nycflights13)
> 
> 
> rbr = as_record_batch_reader(nycflights13::flights)
> big_arrow_table = do.call(rbind,lapply(1:2, \(x) arrow::arrow_table(nycflights13::flights)))
> rbr = as_record_batch_reader(big_arrow_table)
> 
> dim(big_arrow_table)
[1] 673552     19
> 
> # via arrow API
> system.time({df = rpolars:::rb_list_to_df(rbr$batches(),rbr$schema$names)})
   user  system elapsed 
  0.020   0.001   0.022 
> 
> big_df = as.data.frame(big_arrow_table)
> 
> # via r-polars conversion full copy
> system.time({df_simple = pl$DataFrame(big_df)})
   user  system elapsed 
  0.381   0.007   0.387 
> 

@eddelbuettel
Copy link
Contributor Author

Nice. And you immediatly went for batches.

@sorhawell sorhawell mentioned this issue Mar 8, 2023
@sorhawell
Copy link
Collaborator

sorhawell commented Mar 8, 2023

For future reference. This implementation calls via R low-level ExportArray for each chunk (contiguous array of mem). To speed up further likely use a high-level stream interface, to avoid calling via R for each chunk. It should be something rust arrow2 and R Arrow both supports.

@kylebarron
Copy link

uhh I hit a steep bump on my learning path on pointer magic and the Arrow protocol. But now it works. It is 20 times faster than current full-copy rpolars pl$DataFrame()

The perfect use case for the C data interface!

If chunks are very small, it might slow down a bit.

IIRC py-polars tends to rechunk the data to avoid having lots of small batches

@sorhawell
Copy link
Collaborator

sorhawell commented Mar 9, 2023

found another 2x speed-up by doing exactly as in py-polars
200ms for 8M rows and 20 mixed strings and ints columns (nycflights13::flights x 25).

IIRC py-polars tends to rechunk the data to avoid having lots of small batches

yeah in the end it will likely get rechunked. I was concerned when importing every single chunk before the rechunking, that there would be considerable overhead. However, I tried benchmark a input table of 500 chunks versus 20, and the latter was only 16% faster, so probably as good as it gets.

@kylebarron
Copy link

kylebarron commented Mar 9, 2023

use a high-level stream interface, to avoid calling via R for each chunk. It should be something rust arrow2 and R Arrow both supports

Also just to make sure you're aware, this seems like a great use case for the Arrow C Stream interface. arrow2 supports that here and I have an example of using the arrow2 API here. I'm not familiar with R arrow but nanoarrow says it supports that interface

@sorhawell
Copy link
Collaborator

@kylebarron many thx for examples it was helpful.

I got streaming of record-batches from R-arrow to Arrow2 to work, however I see no option in R-arrow or py-arrow to just stream a chunked array as is without turning it into row-wise records/structs.

I made a question on SO about it:
Can I produce a chunked-array stream from py-arrow and/or r-arrow and export it to rust?

@sorhawell
Copy link
Collaborator

sorhawell commented Mar 14, 2023

@eddelbuettel and @kylebarron many thanks for input
pl$from_arrow() has been implemented now

There was little fun unexpected spin-off, in an upcomming version of rpolars, it is slightly faster to create an r-arrow Table via r-polars DataFrame+nanoarrow because r-polars can read from R memory multi-threaded (some AltRep's not supported ... ).

library(rpolars)
library(arrow)
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
library(nycflights13)
library(nanoarrow)
library(bench)
big_df =  do.call(rbind,lapply(1:25, \(x) nycflights13::flights))
x = bench::mark(
  r_polars = pl$DataFrame(big_df,parallel = TRUE),
  r_to_polars_to_arrow = as_arrow_table(pl$DataFrame(big_df,parallel = TRUE)),
  r_to_arrow = as_arrow_table(big_df),
  check = FALSE
)
print(x)
#> # A tibble: 3 × 13
#>   expression              min median itr/s…¹ mem_a…² gc/se…³ n_itr  n_gc total…⁴
#>   <bch:expr>           <bch:> <bch:>   <dbl> <bch:b>   <dbl> <int> <dbl> <bch:t>
#> 1 r_polars              644ms  644ms    1.55  64.6MB       0     1     0   644ms
#> 2 r_to_polars_to_arrow  677ms  677ms    1.48  65.9MB       0     1     0   677ms
#> 3 r_to_arrow            987ms  987ms    1.01  93.9KB       0     1     0   987ms
#> # … with 4 more variables: result <list>, memory <list>, time <list>,
#> #   gc <list>, and abbreviated variable names ¹​`itr/sec`, ²​mem_alloc,
#> #   ³​`gc/sec`, ⁴​total_time

Created on 2023-03-14 with reprex v2.0.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants