Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Sep 10, 2024
1 parent 76a340b commit 790650c
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 47 deletions.
6 changes: 6 additions & 0 deletions crates/polars-io/src/utils/byte_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ impl From<ObjectStoreByteSource> for DynByteSource {
}
}

impl From<MemSlice> for DynByteSource {
fn from(value: MemSlice) -> Self {
Self::MemSlice(MemSliceByteSource(value))
}
}

#[derive(Clone, Debug)]
pub enum DynByteSourceBuilder {
Mmap,
Expand Down
18 changes: 18 additions & 0 deletions crates/polars-plan/src/plans/ir/scan_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;

use polars_core::error::{feature_gated, PolarsResult};
use polars_io::cloud::CloudOptions;
use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder};
use polars_utils::mmap::MemSlice;
use polars_utils::pl_str::PlSmallStr;

Expand Down Expand Up @@ -237,6 +239,22 @@ impl<'a> ScanSourceRef<'a> {
Self::Buffer(buff) => Ok(MemSlice::from_bytes((*buff).clone())),
}
}

pub async fn to_dyn_byte_source(
&self,
builder: &DynByteSourceBuilder,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<DynByteSource> {
match self {
Self::Path(path) => {
builder
.try_build_from_path(path.to_str().unwrap(), cloud_options)
.await
},
Self::File(file) => Ok(DynByteSource::from(MemSlice::from_file(file)?)),
Self::Buffer(buff) => Ok(DynByteSource::from(MemSlice::from_bytes((*buff).clone()))),
}
}
}

impl<'a> Iterator for ScanSourceIter<'a> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ all = [
"binary_encoding",
"ffi_plugin",
"polars_cloud",
# "new_streaming",
"new_streaming",
]

# we cannot conditionally activate simd
Expand Down
65 changes: 35 additions & 30 deletions crates/polars-stream/src/nodes/parquet_source.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::future::Future;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

Expand All @@ -22,10 +21,10 @@ use polars_io::utils::byte_source::{
ByteSource, DynByteSource, DynByteSourceBuilder, MemSliceByteSource,
};
use polars_io::utils::slice::SplitSlicePosition;
use polars_io::{is_cloud_url, RowIndex};
use polars_io::RowIndex;
use polars_parquet::read::RowGroupMetaData;
use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::FileInfo;
use polars_plan::plans::{FileInfo, ScanSources};
use polars_plan::prelude::FileScanOptions;
use polars_utils::mmap::MemSlice;
use polars_utils::pl_str::PlSmallStr;
Expand All @@ -46,7 +45,7 @@ type AsyncTaskData = Option<(

#[allow(clippy::type_complexity)]
pub struct ParquetSourceNode {
paths: Arc<[PathBuf]>,
scan_sources: ScanSources,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
Expand All @@ -71,7 +70,7 @@ pub struct ParquetSourceNode {
#[allow(clippy::too_many_arguments)]
impl ParquetSourceNode {
pub fn new(
paths: Arc<[PathBuf]>,
scan_sources: ScanSources,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
Expand All @@ -81,16 +80,15 @@ impl ParquetSourceNode {
) -> Self {
let verbose = config::verbose();

let byte_source_builder =
if is_cloud_url(paths[0].to_str().unwrap()) || config::force_async() {
DynByteSourceBuilder::ObjectStore
} else {
DynByteSourceBuilder::Mmap
};
let byte_source_builder = if scan_sources.is_cloud_url() || config::force_async() {
DynByteSourceBuilder::ObjectStore
} else {
DynByteSourceBuilder::Mmap
};
let memory_prefetch_func = get_memory_prefetch_func(verbose);

Self {
paths,
scan_sources,
file_info,
hive_parts,
predicate,
Expand Down Expand Up @@ -570,23 +568,25 @@ impl ParquetSourceNode {
}

let fetch_metadata_bytes_for_path_index = {
let paths = &self.paths;
let scan_sources = &self.scan_sources;
let cloud_options = Arc::new(self.cloud_options.clone());

let paths = paths.clone();
let scan_sources = scan_sources.clone();
let cloud_options = cloud_options.clone();
let byte_source_builder = byte_source_builder.clone();

move |path_idx: usize| {
let paths = paths.clone();
let scan_sources = scan_sources.clone();
let cloud_options = cloud_options.clone();
let byte_source_builder = byte_source_builder.clone();

let handle = io_runtime.spawn(async move {
let mut byte_source = Arc::new(
byte_source_builder
.try_build_from_path(
paths[path_idx].to_str().unwrap(),
scan_sources
.get(path_idx)
.unwrap()
.to_dyn_byte_source(
&byte_source_builder,
cloud_options.as_ref().as_ref(),
)
.await?,
Expand Down Expand Up @@ -681,13 +681,13 @@ impl ParquetSourceNode {
.slice
.map(|(offset, len)| offset as usize..offset as usize + len);

let mut metadata_stream = futures::stream::iter(0..self.paths.len())
let mut metadata_stream = futures::stream::iter(0..self.scan_sources.len())
.map(fetch_metadata_bytes_for_path_index)
.buffered(metadata_prefetch_size)
.map(process_metadata_bytes)
.buffered(metadata_decode_ahead_size);

let paths = self.paths.clone();
let scan_sources = self.scan_sources.clone();

// We need to be able to both stop early as well as skip values, which is easier to do
// using a custom task instead of futures::stream
Expand Down Expand Up @@ -715,9 +715,11 @@ impl ParquetSourceNode {
.map_err(|err| {
err.wrap_msg(|msg| {
format!(
"error at path (index: {}, path: {}): {}",
"error at path (index: {}, path: {:?}): {}",
current_path_index,
paths[current_path_index].to_str().unwrap(),
scan_sources
.get(current_path_index)
.map(|x| PlSmallStr::from_str(x.to_include_path_name())),
msg
)
})
Expand Down Expand Up @@ -771,7 +773,7 @@ impl ParquetSourceNode {
Stopped reading at file at index {} \
(remaining {} files will not be read)",
current_path_index,
paths.len() - current_path_index - 1,
scan_sources.len() - current_path_index - 1,
);
}
break;
Expand All @@ -786,7 +788,7 @@ impl ParquetSourceNode {
let slice = self.file_options.slice.unwrap();
let slice_start_as_n_from_end = -slice.0 as usize;

let mut metadata_stream = futures::stream::iter((0..self.paths.len()).rev())
let mut metadata_stream = futures::stream::iter((0..self.scan_sources.len()).rev())
.map(fetch_metadata_bytes_for_path_index)
.buffered(metadata_prefetch_size)
.map(process_metadata_bytes)
Expand Down Expand Up @@ -831,7 +833,7 @@ impl ParquetSourceNode {
PolarsResult::Ok((slice_range, processed_metadata_rev, cum_rows))
};

let path_count = self.paths.len();
let path_count = self.scan_sources.len();

io_runtime.spawn(async move {
if start_rx.await.is_err() {
Expand Down Expand Up @@ -935,7 +937,7 @@ impl ParquetSourceNode {
);
assert_eq!(self.predicate.is_some(), self.physical_predicate.is_some());

let paths = self.paths.clone();
let scan_sources = self.scan_sources.clone();
let hive_partitions = self.hive_parts.clone();
let hive_partitions_width = hive_partitions
.as_deref()
Expand All @@ -948,7 +950,7 @@ impl ParquetSourceNode {
let ideal_morsel_size = get_ideal_morsel_size();

RowGroupDecoder {
paths,
scan_sources,
hive_partitions,
hive_partitions_width,
include_file_paths,
Expand Down Expand Up @@ -983,7 +985,7 @@ impl ParquetSourceNode {
eprintln!(
"[ParquetSource]: {} columns to be projected from {} files",
self.projected_arrow_fields.len(),
self.paths.len(),
self.scan_sources.len(),
);
}
}
Expand Down Expand Up @@ -1355,7 +1357,7 @@ struct SharedFileState {

/// Turns row group data into DataFrames.
struct RowGroupDecoder {
paths: Arc<[PathBuf]>,
scan_sources: ScanSources,
hive_partitions: Option<Arc<Vec<HivePartitions>>>,
hive_partitions_width: usize,
include_file_paths: Option<PlSmallStr>,
Expand Down Expand Up @@ -1520,7 +1522,10 @@ impl RowGroupDecoder {
let file_path_series = self.include_file_paths.clone().map(|file_path_col| {
StringChunked::full(
file_path_col,
self.paths[path_index].to_str().unwrap(),
self.scan_sources
.get(path_index)
.unwrap()
.to_include_path_name(),
row_group_data.file_max_row_group_height,
)
.into_series()
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-stream/src/physical_plan/fmt.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::Write;

use polars_plan::plans::expr_ir::ExprIR;
use polars_plan::plans::{AExpr, EscapeLabel, FileScan, PathsDisplay};
use polars_plan::plans::{AExpr, EscapeLabel, FileScan, ScanSourcesDisplay};
use polars_utils::arena::Arena;
use polars_utils::itertools::Itertools;
use slotmap::{Key, SecondaryMap, SlotMap};
Expand Down Expand Up @@ -107,7 +107,7 @@ fn visualize_plan_rec(
},
PhysNodeKind::Multiplexer { input } => ("multiplexer".to_string(), from_ref(input)),
PhysNodeKind::FileScan {
paths,
scan_sources,
file_info,
hive_parts,
output_schema: _,
Expand All @@ -127,9 +127,9 @@ fn visualize_plan_rec(
let mut f = EscapeLabel(&mut out);

{
let paths_display = PathsDisplay(paths.as_ref());
let disp = ScanSourcesDisplay(scan_sources);

write!(f, "\npaths: {}", paths_display).unwrap();
write!(f, "\npaths: {}", disp).unwrap();
}

{
Expand Down
8 changes: 2 additions & 6 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub fn lower_ir(

v @ IR::Scan { .. } => {
let IR::Scan {
sources,
sources: scan_sources,
file_info,
hive_parts,
output_schema,
Expand All @@ -343,12 +343,8 @@ pub fn lower_ir(
unreachable!();
};

let paths = sources
.into_paths()
.unwrap_or_else(|| todo!("streaming scanning of in-memory buffers"));

PhysNodeKind::FileScan {
paths,
scan_sources,
file_info,
hive_parts,
output_schema,
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::path::PathBuf;
use std::sync::Arc;

use polars_core::frame::DataFrame;
use polars_core::prelude::{InitHashMaps, PlHashMap, SortMultipleOptions};
use polars_core::schema::{Schema, SchemaRef};
use polars_error::PolarsResult;
use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::{AExpr, DataFrameUdf, FileInfo, FileScan, IR};
use polars_plan::plans::{AExpr, DataFrameUdf, FileInfo, FileScan, ScanSources, IR};
use polars_plan::prelude::expr_ir::ExprIR;

mod fmt;
Expand Down Expand Up @@ -119,7 +118,7 @@ pub enum PhysNodeKind {
},

FileScan {
paths: Arc<[PathBuf]>,
scan_sources: ScanSources,
file_info: FileInfo,
hive_parts: Option<Arc<Vec<HivePartitions>>>,
predicate: Option<ExprIR>,
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-stream/src/physical_plan/to_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ fn to_graph_rec<'a>(

v @ FileScan { .. } => {
let FileScan {
paths,
scan_sources,
file_info,
hive_parts,
output_schema,
Expand Down Expand Up @@ -298,7 +298,7 @@ fn to_graph_rec<'a>(
if std::env::var("POLARS_DISABLE_PARQUET_SOURCE").as_deref() != Ok("1") {
ctx.graph.add_node(
nodes::parquet_source::ParquetSourceNode::new(
paths,
scan_sources,
file_info,
hive_parts,
predicate,
Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1969,7 +1969,7 @@ def collect(
│ c ┆ 6 ┆ 1 │
└─────┴─────┴─────┘
"""
new_streaming = _kwargs.get("new_streaming", False)
new_streaming = _kwargs.get("new_streaming", True)

if no_optimization or _eager:
predicate_pushdown = False
Expand Down

0 comments on commit 790650c

Please sign in to comment.