Skip to content

Commit

Permalink
refactor: centralize infallible available_parallelism fn. Use bette…
Browse files Browse the repository at this point in the history
…r channel size limit in arrow read
  • Loading branch information
sdd committed Aug 13, 2024
1 parent bb8b9f5 commit 458f256
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 9 deletions.
11 changes: 5 additions & 6 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::io::{FileIO, FileMetadata, FileRead};
use crate::runtime::spawn;
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, Schema};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

/// Builder to create ArrowReader
Expand All @@ -58,9 +59,7 @@ pub struct ArrowReaderBuilder {
impl ArrowReaderBuilder {
/// Create a new ArrowReaderBuilder
pub(crate) fn new(file_io: FileIO) -> Self {
let num_cpus = std::thread::available_parallelism()
.expect("failed to get number of CPUs")
.get();
let num_cpus = available_parallelism().get();

ArrowReaderBuilder {
batch_size: None,
Expand Down Expand Up @@ -109,16 +108,16 @@ impl ArrowReader {
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
let max_concurrent_fetching_datafiles = self.concurrency_limit_data_files;
let concurrency_limit_data_files = self.concurrency_limit_data_files;

let (tx, rx) = channel(10);
let (tx, rx) = channel(concurrency_limit_data_files);
let mut channel_for_error = tx.clone();

spawn(async move {
let result = tasks
.map(|task| Ok((task, file_io.clone(), tx.clone())))
.try_for_each_concurrent(
max_concurrent_fetching_datafiles,
concurrency_limit_data_files,
|(file_scan_task, file_io, tx)| async move {
match file_scan_task {
Ok(task) => {
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ pub mod transform;
mod runtime;

pub mod arrow;
mod utils;
pub mod writer;
5 changes: 2 additions & 3 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::spec::{
SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind, Result};

/// A stream of [`FileScanTask`].
Expand All @@ -62,9 +63,7 @@ pub struct TableScanBuilder<'a> {

impl<'a> TableScanBuilder<'a> {
pub(crate) fn new(table: &'a Table) -> Self {
let num_cpus = std::thread::available_parallelism()
.expect("failed to get number of CPUs")
.get();
let num_cpus = available_parallelism().get();

Self {
table,
Expand Down
25 changes: 25 additions & 0 deletions crates/iceberg/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use std::num::NonZero;

// Use a default value of 1 as the safest option.
// See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations
// for more details.
const DEFAULT_PARALLELISM: usize = 1;

/// Uses [`std::thread::available_parallelism`] in order to
/// retrieve an estimate of the default amount of parallelism
/// that should be used. Note that [`std::thread::available_parallelism`]
/// returns a `Result` as it can fail, so here we use
/// a default value instead.
/// Note: we don't use a OnceCell or LazyCell here as there
/// are circumstances where the level of available
/// parallelism can change during the lifetime of an executing
/// process, but this should not be called in a hot loop.
pub(crate) fn available_parallelism() -> NonZero<usize> {
std::thread::available_parallelism().unwrap_or_else(|_err| {
// Failed to get the level of parallelism.
// TODO: log/trace when this fallback occurs.

// Using a default value.
NonZero::new(DEFAULT_PARALLELISM).unwrap()
})
}

0 comments on commit 458f256

Please sign in to comment.