-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: import_stream internal method for Series to support Arrow C stream interface #1078
Conversation
as_polars_series()
and as_polars_df()
as_polars_series()
and as_polars_df()
as_polars_series()
and as_polars_df()
for nanoarrow_array_stream
…ow::RecordBatchReader>)` and remove unused internal functions
as_polars_series()
and as_polars_df()
for nanoarrow_array_streamas_polars_series()
and as_polars_df()
The rechunk option of In both cases, the case of using nanoarrow_array_stream as the intermediate format, which performs the conversion via the Arrow C Stream interface only, is the fastest, so perhaps it may be worth eliminating the rechunk option and rewriting the code completely to use only the Arrow C Stream. The latest release Detailslibrary(polars)
library(nanoarrow)
library(arrow, warn.conflicts = FALSE)
polars_info()
#> Polars R package version : 0.16.3
#> Rust Polars crate version: 0.39.2
#>
#> Thread pool size: 16
#>
#> Features:
#> default TRUE
#> full_features TRUE
#> disable_limit_max_threads TRUE
#> nightly TRUE
#> sql TRUE
#> rpolars_debug_print FALSE
#>
#> Code completion: deactivated
arrow_info()
#> Arrow package version: 15.0.1
#>
#> Capabilities:
#>
#> acero TRUE
#> dataset TRUE
#> substrait FALSE
#> parquet TRUE
#> json TRUE
#> s3 TRUE
#> gcs TRUE
#> utf8proc TRUE
#> re2 TRUE
#> snappy TRUE
#> gzip TRUE
#> brotli TRUE
#> zstd TRUE
#> lz4 TRUE
#> lz4_frame TRUE
#> lzo FALSE
#> bz2 TRUE
#> jemalloc TRUE
#> mimalloc TRUE
#>
#> Memory:
#>
#> Allocator jemalloc
#> Current 0 bytes
#> Max 0 bytes
#>
#> Runtime:
#>
#> SIMD Level avx2
#> Detected SIMD Level avx2
#>
#> Build:
#>
#> C++ Library Version 15.0.1
#> C++ Compiler GNU
#> C++ Compiler Version 11.4.0
big_at = do.call(rbind, lapply(1:5, \(x) as_arrow_table(nycflights13::flights)))
bench::mark(
rechunk = as_polars_df(big_at, rechunk = TRUE),
not_rechunk = as_polars_df(big_at, rechunk = FALSE),
via_nanoarrow = as_polars_df(as_nanoarrow_array_stream(big_at)),
via_r = as_polars_df(as.data.frame(big_at)),
check = FALSE,
min_iterations = 5
)
#> Warning: Some expressions had a GC in every iteration; so filtering is
#> disabled.
#> # A tibble: 4 × 6
#> expression min median `itr/sec` mem_alloc `gc/sec`
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
#> 1 rechunk 178.9ms 193.55ms 5.12 6.55MB 2.05
#> 2 not_rechunk 173.72ms 223.31ms 4.67 229.05KB 2.80
#> 3 via_nanoarrow 159.8ms 164.03ms 5.81 1.75MB 0
#> 4 via_r 1.37s 1.45s 0.671 167.41MB 0.671 Created on 2024-05-06 with reprex v2.1.0 Detailslibrary(polars)
library(nanoarrow)
library(arrow, warn.conflicts = FALSE)
polars_info()
#> Polars R package version : 0.16.3.9000
#> Rust Polars crate version: 0.39.2
#>
#> Thread pool size: 16
#>
#> Features:
#> default TRUE
#> full_features TRUE
#> disable_limit_max_threads TRUE
#> nightly TRUE
#> sql TRUE
#> rpolars_debug_print FALSE
#>
#> Code completion: deactivated
arrow_info()
#> Arrow package version: 15.0.1
#>
#> Capabilities:
#>
#> acero TRUE
#> dataset TRUE
#> substrait FALSE
#> parquet TRUE
#> json TRUE
#> s3 TRUE
#> gcs TRUE
#> utf8proc TRUE
#> re2 TRUE
#> snappy TRUE
#> gzip TRUE
#> brotli TRUE
#> zstd TRUE
#> lz4 TRUE
#> lz4_frame TRUE
#> lzo FALSE
#> bz2 TRUE
#> jemalloc TRUE
#> mimalloc TRUE
#>
#> Memory:
#>
#> Allocator jemalloc
#> Current 0 bytes
#> Max 0 bytes
#>
#> Runtime:
#>
#> SIMD Level avx2
#> Detected SIMD Level avx2
#>
#> Build:
#>
#> C++ Library Version 15.0.1
#> C++ Compiler GNU
#> C++ Compiler Version 11.4.0
big_at = do.call(rbind, lapply(1:5, \(x) as_arrow_table(nycflights13::flights)))
bench::mark(
rechunk = as_polars_df(big_at, rechunk = TRUE),
not_rechunk = as_polars_df(big_at, rechunk = FALSE),
via_nanoarrow = as_polars_df(as_nanoarrow_array_stream(big_at)),
via_r = as_polars_df(as.data.frame(big_at)),
check = FALSE,
min_iterations = 5
)
#> Warning: Some expressions had a GC in every iteration; so filtering is
#> disabled.
#> # A tibble: 4 × 6
#> expression min median `itr/sec` mem_alloc `gc/sec`
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
#> 1 rechunk 244.69ms 291.57ms 3.43 9.69MB 0.686
#> 2 not_rechunk 251.56ms 275.39ms 3.42 47.88KB 0.684
#> 3 via_nanoarrow 184.96ms 203.89ms 4.65 126.65KB 0
#> 4 via_r 1.55s 1.77s 0.575 167.48MB 0.920 Created on 2024-05-06 with reprex v2.1.0 |
as_polars_series()
and as_polars_df()
as_polars_series()
and as_polars_df()
as_polars_series()
and as_polars_df()
as_polars_series()
and as_polars_df()
Benchmarking showed no difference in speed between the existing conversion copied from Python Polars and the conversion via the new Arrow C Stream interface, so I changed the conversion from |
I don't know why, but it seems that rechunking is automatically done when a Struct type Stream is converted to a Series? library(nanoarrow)
library(polars)
s_int <- basic_array_stream(list(as_nanoarrow_array(1:5), as_nanoarrow_array(6:10))) |>
as_polars_series()
s_struct <- basic_array_stream(list(as_nanoarrow_array(mtcars[1:5, ]), as_nanoarrow_array(mtcars[6:10, ]))) |>
as_polars_series()
s_int$n_chunks()
#> [1] 2
s_struct$n_chunks()
#> [1] 1 Created on 2024-05-06 with reprex v2.1.0 |
Using the internal functions of the release version, rechunking has not been done, so something could be wrong.......
|
I think this is because the conversion was done on a chunk-by-chunk basis to Series, but now it is done in chunks, which is not a problem considering that Python Polars also do the conversion in chunks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't follow your recent changes very closely, but based on your benchmarks above it seems all conversions have slowed down compared to the release version. Is this expected?
Thanks for your review.
I don't know the cause either, but it may be related to the fact that the development version of the benchmark is running in a development container and the release version of the benchmark is using a different, newer container. In any case, I am running the benchmark on a machine that is simultaneously running other software such as a browser, etc. and the results seem to vary quite a bit, so I suspect it is an effect of variability. |
as_polars_series()
and as_polars_df()
…face" This reverts commit 350689c.
…nside the function
Hmmm, that is really strange the performance of the conversion from nanoarrow_array_stream has degraded. I added a new feature in e04762c to switch the conversion method from nanoarrow_array_stream, but both are displayed at the same speed. Detailslibrary(polars)
library(nanoarrow)
library(arrow, warn.conflicts = FALSE)
polars_info()
#> Polars R package version : 0.16.3.9000
#> Rust Polars crate version: 0.39.2
#>
#> Thread pool size: 16
#>
#> Features:
#> default TRUE
#> full_features TRUE
#> disable_limit_max_threads TRUE
#> nightly TRUE
#> sql TRUE
#> rpolars_debug_print FALSE
#>
#> Code completion: deactivated
arrow_info()
#> Arrow package version: 15.0.1
#>
#> Capabilities:
#>
#> acero TRUE
#> dataset TRUE
#> substrait FALSE
#> parquet TRUE
#> json TRUE
#> s3 TRUE
#> gcs TRUE
#> utf8proc TRUE
#> re2 TRUE
#> snappy TRUE
#> gzip TRUE
#> brotli TRUE
#> zstd TRUE
#> lz4 TRUE
#> lz4_frame TRUE
#> lzo FALSE
#> bz2 TRUE
#> jemalloc TRUE
#> mimalloc TRUE
#>
#> Memory:
#>
#> Allocator jemalloc
#> Current 0 bytes
#> Max 0 bytes
#>
#> Runtime:
#>
#> SIMD Level avx2
#> Detected SIMD Level avx2
#>
#> Build:
#>
#> C++ Library Version 15.0.1
#> C++ Compiler GNU
#> C++ Compiler Version 11.4.0
big_at = do.call(rbind, lapply(1:5, \(x) as_arrow_table(nycflights13::flights)))
bench::mark(
not_rechunk = as_polars_df(big_at, rechunk = FALSE),
rechunk = as_polars_df(big_at, rechunk = TRUE),
via_nanoarrow_c_stream = as_polars_df(as_nanoarrow_array_stream(big_at, experimental = TRUE)),
via_nanoarrow = as_polars_df(as_nanoarrow_array_stream(big_at)),
via_r = as_polars_df(as.data.frame(big_at)),
check = FALSE,
min_iterations = 5
)
#> Warning: Some expressions had a GC in every iteration; so filtering is
#> disabled.
#> # A tibble: 5 × 6
#> expression min median `itr/sec` mem_alloc `gc/sec`
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
#> 1 not_rechunk 336.7ms 366.83ms 2.61 8.62MB 0.521
#> 2 rechunk 238.9ms 278.98ms 3.64 1.12MB 0
#> 3 via_nanoarrow_c_stream 122.8ms 130.09ms 7.57 159.72KB 1.51
#> 4 via_nanoarrow 110.3ms 114.75ms 8.06 14.14KB 0
#> 5 via_r 1.5s 1.69s 0.589 167.48MB 0.707 Re ArrowTabular, I think it's because they no longer parallelize for the conversion from ArrowTabular. I think we need to add this parallelization process to the C stream interface. |
In the latest commit e3b9567, an additional Detailslibrary(polars)
library(nanoarrow)
library(arrow, warn.conflicts = FALSE)
polars_info()
#> Polars R package version : 0.16.3.9000
#> Rust Polars crate version: 0.39.2
#>
#> Thread pool size: 16
#>
#> Features:
#> default TRUE
#> full_features TRUE
#> disable_limit_max_threads TRUE
#> nightly TRUE
#> sql TRUE
#> rpolars_debug_print FALSE
#>
#> Code completion: deactivated
arrow_info()
#> Arrow package version: 15.0.1
#>
#> Capabilities:
#>
#> acero TRUE
#> dataset TRUE
#> substrait FALSE
#> parquet TRUE
#> json TRUE
#> s3 TRUE
#> gcs TRUE
#> utf8proc TRUE
#> re2 TRUE
#> snappy TRUE
#> gzip TRUE
#> brotli TRUE
#> zstd TRUE
#> lz4 TRUE
#> lz4_frame TRUE
#> lzo FALSE
#> bz2 TRUE
#> jemalloc TRUE
#> mimalloc TRUE
#>
#> Memory:
#>
#> Allocator jemalloc
#> Current 0 bytes
#> Max 0 bytes
#>
#> Runtime:
#>
#> SIMD Level avx2
#> Detected SIMD Level avx2
#>
#> Build:
#>
#> C++ Library Version 15.0.1
#> C++ Compiler GNU
#> C++ Compiler Version 11.4.0
big_at = do.call(rbind, lapply(1:5, \(x) as_arrow_table(nycflights13::flights)))
bench::mark(
not_rechunk = as_polars_df(big_at, rechunk = FALSE),
not_rechunk_cs = as_polars_df(big_at, rechunk = FALSE, experimental = TRUE),
rechunk = as_polars_df(big_at, rechunk = TRUE),
rechunk_cs = as_polars_df(big_at, rechunk = TRUE, experimental = TRUE),
via_nanoarrow = as_polars_df(as_nanoarrow_array_stream(big_at)),
via_nanoarrow_cs = as_polars_df(as_nanoarrow_array_stream(big_at), experimental = TRUE),
via_r = as_polars_df(as.data.frame(big_at)),
check = FALSE,
min_iterations = 10
)
#> Warning: Some expressions had a GC in every iteration; so filtering is
#> disabled.
#> # A tibble: 7 × 6
#> expression min median `itr/sec` mem_alloc `gc/sec`
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
#> 1 not_rechunk 170.49ms 184.87ms 5.03 6.62MB 2.01
#> 2 not_rechunk_cs 227.17ms 294.13ms 3.52 3.44MB 0.705
#> 3 rechunk 170.82ms 192.14ms 5.12 1.28MB 2.05
#> 4 rechunk_cs 277.19ms 326.41ms 2.85 52.02KB 0.571
#> 5 via_nanoarrow 120.08ms 151.54ms 6.56 159.72KB 0
#> 6 via_nanoarrow_cs 164.08ms 350.81ms 3.11 14.14KB 0
#> 7 via_r 1.58s 1.79s 0.491 167.48MB 0.834 Created on 2024-05-07 with reprex v2.1.0 In comparison, the conventional implementation is clearly faster. However, the conversion via nanoarrow is much faster....... @etiennebacher Could you please check the performance? I think it is possible that the slow conversion via the C stream interface could be improved upstream; there is a large demand in Python to implement the C stream interface to remove the |
Here's what I get: library(polars)
library(nanoarrow)
library(arrow, warn.conflicts = FALSE)
big_at = do.call(rbind, lapply(1:5, \(x) as_arrow_table(nycflights13::flights)))
bench::mark(
not_rechunk = as_polars_df(big_at, rechunk = FALSE),
not_rechunk_cs = as_polars_df(big_at, rechunk = FALSE, experimental = TRUE),
rechunk = as_polars_df(big_at, rechunk = TRUE),
rechunk_cs = as_polars_df(big_at, rechunk = TRUE, experimental = TRUE),
via_nanoarrow = as_polars_df(as_nanoarrow_array_stream(big_at)),
via_nanoarrow_cs = as_polars_df(as_nanoarrow_array_stream(big_at), experimental = TRUE),
via_r = as_polars_df(as.data.frame(big_at)),
check = FALSE,
min_iterations = 50
)
#> # A tibble: 7 × 6
#> expression min median `itr/sec` mem_alloc `gc/sec`
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
#> 1 not_rechunk 81.1ms 183.87ms 5.70 6.5MB 4.86
#> 2 not_rechunk_cs 268.72ms 396.62ms 2.55 3.46MB 0.559
#> 3 rechunk 184.21ms 202.18ms 4.68 1.28MB 4.32
#> 4 rechunk_cs 241.51ms 408.14ms 2.54 52.02KB 0.636
#> 5 via_nanoarrow 159.91ms 202.8ms 4.01 159.72KB 0.0819
#> 6 via_nanoarrow_cs 183.72ms 202.39ms 3.16 14.14KB 0.0644
#> 7 via_r 1.51s 1.57s 0.643 167.48MB 16.1 For some reason, #> # A tibble: 4 × 6
#> expression min median `itr/sec` mem_alloc `gc/sec`
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
#> 1 rechunk 84.8ms 116.3ms 7.87 6.43MB 7.26
#> 2 not_rechunk 73.5ms 96.4ms 10.0 229.05KB 9.25
#> 3 via_nanoarrow 66.6ms 131ms 3.43 1.75MB 0.0699
#> 4 via_r 566.3ms 707.9ms 1.31 167.41MB 24.9 |
@etiennebacher Thanks for checking! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think it's fine, thanks!
@@ -9,9 +9,11 @@ | |||
#' If schema names or types do not match `x`, the columns will be renamed/recast. | |||
#' If `NULL` (default), convert columns as is. | |||
#' @param schema_overrides named list of DataTypes. Cast some columns to the DataType. | |||
#' @param experimental If `TRUE`, use the Arrow C stream interface. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you inherit that param from as_polars_series()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
This is a internal function and doesn't have Rd file, so roxygen comment inheritance does not make sense.
Another interesting result. If only zero-copy types (here float64) are included and there are many chunks, it is much faster via the C stream interface, as expected. library(polars)
library(nanoarrow)
library(arrow, warn.conflicts = FALSE)
big_at = do.call(rbind, lapply(1:100, \(x) as_arrow_table(mtcars)))
bench::mark(
not_rechunk = as_polars_df(big_at, rechunk = FALSE),
not_rechunk_cs = as_polars_df(big_at, rechunk = FALSE, experimental = TRUE),
rechunk = as_polars_df(big_at, rechunk = TRUE),
rechunk_cs = as_polars_df(big_at, rechunk = TRUE, experimental = TRUE),
via_nanoarrow = as_polars_df(as_nanoarrow_array_stream(big_at)),
via_nanoarrow_cs = as_polars_df(as_nanoarrow_array_stream(big_at), experimental = TRUE),
via_r = as_polars_df(as.data.frame(big_at)),
check = FALSE,
min_iterations = 10
)
#> Warning: Some expressions had a GC in every iteration; so filtering is
#> disabled.
#> # A tibble: 7 × 6
#> expression min median `itr/sec` mem_alloc `gc/sec`
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
#> 1 not_rechunk 961.75ms 1.04s 0.930 10.28MB 3.81
#> 2 not_rechunk_cs 35.19ms 40.13ms 23.3 3.42MB 3.88
#> 3 rechunk 1.05s 1.12s 0.898 3.25MB 3.59
#> 4 rechunk_cs 41.08ms 46.23ms 20.2 33.41KB 3.68
#> 5 via_nanoarrow 80.15ms 100.23ms 7.83 199.01KB 1.57
#> 6 via_nanoarrow_cs 6.61ms 9.77ms 89.0 4.3KB 1.98
#> 7 via_r 14.42ms 18.84ms 50.4 614.66KB 1.94 Created on 2024-05-08 with reprex v2.1.0 |
Close #732
Organize functions to import Arrow C Stream,
and use that inside ofas_polars_series()
andas_polars_df()
fornanoarrow_array_stream
etc.(Edit: This change has been undone due to observed performance degradation)
With the addition of this function, several functions that use the Arrow C Stream interface that existed (but were not used) are no longer needed and will be removed.
I have tried quite a few things, but it seems that the external pointer used when importing streams cannot be from another package and must be provided here.
I am convinced that this function is very dangerous and should not be exported, and that the conversion should basically be done via
nanoarrow_array_stream
.In other words, the
import_stream
method will not be added to the DataFrame; useas_polars_df(<nanoarrow_array_stream>)
.Running the same benchmarks as in #896, we can confirm that performance has not decreased.(Edit: Running other benchmarks confirms the performance degradation. See comments like #1078 (comment).)
Created on 2024-05-05 with reprex v2.1.0