Skip to content

Commit

Permalink
almost working further file support
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Sep 6, 2024
1 parent 2cc4db5 commit fbdbe4c
Show file tree
Hide file tree
Showing 29 changed files with 367 additions and 618 deletions.
20 changes: 10 additions & 10 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,7 @@ pub fn count_rows(

let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];
let mut reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;

for _ in 0..reader_bytes.len() {
if reader_bytes[0] != eol_char {
break;
}

reader_bytes = &reader_bytes[1..];
}
let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;

count_rows_from_slice(
reader_bytes,
Expand All @@ -63,13 +55,21 @@ pub fn count_rows(
/// Read the number of rows without parsing columns
/// useful for count(*) queries
pub fn count_rows_from_slice(
bytes: &[u8],
mut bytes: &[u8],
separator: u8,
quote_char: Option<u8>,
comment_prefix: Option<&CommentPrefix>,
eol_char: u8,
has_header: bool,
) -> PolarsResult<usize> {
for _ in 0..bytes.len() {
if bytes[0] != eol_char {
break;
}

bytes = &bytes[1..];
}

const MIN_ROWS_PER_THREAD: usize = 1024;
let max_threads = POOL.current_num_threads();

Expand Down
6 changes: 6 additions & 0 deletions crates/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ impl MmapBytesReader for BufReader<File> {
}
}

impl MmapBytesReader for BufReader<&File> {
fn to_file(&self) -> Option<&File> {
Some(self.get_ref())
}
}

impl<T> MmapBytesReader for Cursor<T>
where
T: AsRef<[u8]> + Send + Sync,
Expand Down
16 changes: 13 additions & 3 deletions crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl LazyCsvReader {
}

pub fn new_paths(paths: Arc<[PathBuf]>) -> Self {
Self::new_with_sources(ScanSources::Files(paths))
Self::new_with_sources(ScanSources::Paths(paths))
}

pub fn new_with_sources(sources: ScanSources) -> Self {
Expand All @@ -47,7 +47,7 @@ impl LazyCsvReader {
}

pub fn new(path: impl AsRef<Path>) -> Self {
Self::new_with_sources(ScanSources::Files([path.as_ref().to_path_buf()].into()))
Self::new_with_sources(ScanSources::Paths([path.as_ref().to_path_buf()].into()))
}

/// Skip this number of rows after the header location.
Expand Down Expand Up @@ -254,7 +254,7 @@ impl LazyCsvReader {
};

let schema = match self.sources.clone() {
ScanSources::Files(paths) => {
ScanSources::Paths(paths) => {
// TODO: Path expansion should happen when converting to the IR
// https://github.com/pola-rs/polars/issues/17634
let paths = expand_paths(&paths[..], self.glob(), self.cloud_options())?;
Expand All @@ -266,6 +266,16 @@ impl LazyCsvReader {
let mut file = polars_utils::open_file(path)?;
infer_schema(get_reader_bytes(&mut file).expect("could not mmap file"))?
},
ScanSources::Files(files) => {
let Some(file) = files.first() else {
polars_bail!(ComputeError: "no buffers specified for this reader");
};

infer_schema(
get_reader_bytes(&mut std::io::BufReader::new(file))
.expect("could not mmap file"),
)?
},
ScanSources::Buffers(buffers) => {
let Some(buffer) = buffers.first() else {
polars_bail!(ComputeError: "no buffers specified for this reader");
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-lazy/src/scan/file_list_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub trait LazyFileListReader: Clone {
return self.finish_no_glob();
}

let ScanSources::Files(paths) = self.sources() else {
unreachable!("in-memory buffers should never be globbed");
let ScanSources::Paths(paths) = self.sources() else {
unreachable!("opened-files or in-memory buffers should never be globbed");
};

let lfs = paths
Expand Down Expand Up @@ -93,7 +93,7 @@ pub trait LazyFileListReader: Clone {
/// Set paths of the scanned files.
#[must_use]
fn with_paths(self, paths: Arc<[PathBuf]>) -> Self {
self.with_sources(ScanSources::Files(paths))
self.with_sources(ScanSources::Paths(paths))
}

/// Configure the row limit.
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ impl LazyFrame {
/// Create a LazyFrame directly from a ipc scan.
pub fn scan_ipc(path: impl AsRef<Path>, args: ScanArgsIpc) -> PolarsResult<Self> {
Self::scan_ipc_sources(
ScanSources::Files([path.as_ref().to_path_buf()].into()),
ScanSources::Paths([path.as_ref().to_path_buf()].into()),
args,
)
}

pub fn scan_ipc_files(paths: Arc<[PathBuf]>, args: ScanArgsIpc) -> PolarsResult<Self> {
Self::scan_ipc_sources(ScanSources::Files(paths), args)
Self::scan_ipc_sources(ScanSources::Paths(paths), args)
}

pub fn scan_ipc_sources(sources: ScanSources, args: ScanArgsIpc) -> PolarsResult<Self> {
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct LazyJsonLineReader {

impl LazyJsonLineReader {
pub fn new_paths(paths: Arc<[PathBuf]>) -> Self {
Self::new_with_sources(ScanSources::Files(paths))
Self::new_with_sources(ScanSources::Paths(paths))
}

pub fn new_with_sources(sources: ScanSources) -> Self {
Expand All @@ -50,7 +50,7 @@ impl LazyJsonLineReader {
}

pub fn new(path: impl AsRef<Path>) -> Self {
Self::new_with_sources(ScanSources::Files([path.as_ref().to_path_buf()].into()))
Self::new_with_sources(ScanSources::Paths([path.as_ref().to_path_buf()].into()))
}

/// Add a row index column.
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl LazyFrame {
/// Create a LazyFrame directly from a parquet scan.
pub fn scan_parquet(path: impl AsRef<Path>, args: ScanArgsParquet) -> PolarsResult<Self> {
Self::scan_parquet_sources(
ScanSources::Files([path.as_ref().to_path_buf()].into()),
ScanSources::Paths([path.as_ref().to_path_buf()].into()),
args,
)
}
Expand All @@ -152,6 +152,6 @@ impl LazyFrame {

/// Create a LazyFrame directly from a parquet scan.
pub fn scan_parquet_files(paths: Arc<[PathBuf]>, args: ScanArgsParquet) -> PolarsResult<Self> {
Self::scan_parquet_sources(ScanSources::Files(paths), args)
Self::scan_parquet_sources(ScanSources::Paths(paths), args)
}
}
22 changes: 1 addition & 21 deletions crates/polars-mem-engine/src/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use polars_core::config;
use polars_core::utils::{
accumulate_dataframes_vertical, accumulate_dataframes_vertical_unchecked,
};
use polars_error::feature_gated;
use polars_utils::mmap::MemSlice;

use super::*;

Expand Down Expand Up @@ -68,25 +66,7 @@ impl CsvExec {
let source = self.sources.at(i);
let owned = &mut vec![];

let memslice = match source {
ScanSourceRef::File(path) => {
let file = if run_async {
feature_gated!("cloud", {
polars_io::file_cache::FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()
})
} else {
polars_utils::open_file(path)
}?;

let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
MemSlice::from_mmap(Arc::new(mmap))
},
ScanSourceRef::Buffer(buffer) => MemSlice::from_bytes(buffer.clone()),
};
let memslice = source.to_memslice_async_latest(run_async)?;

let reader = std::io::Cursor::new(maybe_decompress_bytes(&memslice, owned)?);
let mut df = options
Expand Down
11 changes: 7 additions & 4 deletions crates/polars-mem-engine/src/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub struct IpcExec {
impl IpcExec {
fn read(&mut self) -> PolarsResult<DataFrame> {
let is_cloud = match &self.sources {
ScanSources::Files(paths) => paths.iter().any(is_cloud_url),
ScanSources::Buffers(_) => false,
ScanSources::Paths(paths) => paths.iter().any(is_cloud_url),
ScanSources::Files(_) | ScanSources::Buffers(_) => false,
};
let force_async = config::force_async();

Expand Down Expand Up @@ -75,13 +75,16 @@ impl IpcExec {
let source = self.sources.at(index);

let memslice = match source {
ScanSourceRef::File(path) => {
ScanSourceRef::Path(path) => {
let file = match idx_to_cached_file(index) {
None => std::fs::File::open(path)?,
Some(f) => f?,
};

MemSlice::from_mmap(Arc::new(unsafe { memmap::Mmap::map(&file).unwrap() }))
MemSlice::from_mmap(Arc::new(unsafe { memmap::Mmap::map(&file)? }))
},
ScanSourceRef::File(file) => {
MemSlice::from_mmap(Arc::new(unsafe { memmap::Mmap::map(file)? }))
},
ScanSourceRef::Buffer(buff) => MemSlice::from_bytes(buff.clone()),
};
Expand Down
29 changes: 3 additions & 26 deletions crates/polars-mem-engine/src/executors/scan/ndjson.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use polars_core::config;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_error::feature_gated;
use polars_utils::mmap::MemSlice;

use super::*;

Expand Down Expand Up @@ -76,30 +74,9 @@ impl JsonExec {

let row_index = self.file_scan_options.row_index.as_mut();

let memslice = match source {
ScanSourceRef::File(path) => {
let file = if run_async {
feature_gated!("cloud", {
match polars_io::file_cache::FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()
{
Ok(v) => v,
Err(e) => return Some(Err(e)),
}
})
} else {
match polars_utils::open_file(path) {
Ok(v) => v,
Err(e) => return Some(Err(e)),
}
};

MemSlice::from_mmap(Arc::new(unsafe { memmap::Mmap::map(&file).unwrap() }))
},
ScanSourceRef::Buffer(buff) => MemSlice::from_bytes(buff.clone()),
let memslice = match source.to_memslice_async_latest(run_async) {
Ok(memslice) => memslice,
Err(err) => return Some(Err(err)),
};

let owned = &mut vec![];
Expand Down
22 changes: 2 additions & 20 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use polars_io::cloud::CloudOptions;
use polars_io::parquet::metadata::FileMetaDataRef;
use polars_io::utils::slice::split_slice_at_file;
use polars_io::RowIndex;
use polars_utils::mmap::MemSlice;

use super::*;

Expand Down Expand Up @@ -82,18 +81,7 @@ impl ParquetExec {
let row_counts = path_indexes
.into_par_iter()
.map(|&i| {
let memslice = match self.sources.at(i) {
ScanSourceRef::File(path) => {
let file = std::fs::File::open(path)?;
MemSlice::from_mmap(Arc::new(unsafe {
memmap::Mmap::map(&file).unwrap()
}))
},
ScanSourceRef::Buffer(buff) => {
MemSlice::from_bytes(buff.clone())
},
};

let memslice = self.sources.at(i).to_memslice()?;
ParquetReader::new(std::io::Cursor::new(memslice)).num_rows()
})
.collect::<PolarsResult<Vec<_>>>()?;
Expand Down Expand Up @@ -161,13 +149,7 @@ impl ParquetExec {
hive_partitions.as_deref(),
);

let memslice = match source {
ScanSourceRef::File(path) => {
let file = std::fs::File::open(path)?;
MemSlice::from_mmap(Arc::new(unsafe { memmap::Mmap::map(&file).unwrap() }))
},
ScanSourceRef::Buffer(buff) => MemSlice::from_bytes(buff.clone()),
};
let memslice = source.to_memslice()?;

let mut reader = ParquetReader::new(std::io::Cursor::new(memslice))
.read_parallel(parallel)
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-mem-engine/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use polars_utils::arena::{Arena, Node};
///
/// # Notes
///
/// - Scan sources with in-memory buffers are ignored.
/// - Scan sources with opened files or in-memory buffers are ignored.
pub(crate) fn agg_source_paths<'a>(
root_lp: Node,
acc_paths: &mut PlHashSet<&'a Path>,
Expand All @@ -18,8 +18,8 @@ pub(crate) fn agg_source_paths<'a>(
for (_, lp) in lp_arena.iter(root_lp) {
if let IR::Scan { sources, .. } = lp {
match sources {
ScanSources::Files(paths) => acc_paths.extend(paths.iter().map(|p| p.as_path())),
ScanSources::Buffers(_) => {
ScanSources::Paths(paths) => acc_paths.extend(paths.iter().map(|p| p.as_path())),
ScanSources::Buffers(_) | ScanSources::Files(_) => {
// Ignore
},
}
Expand Down
5 changes: 4 additions & 1 deletion crates/polars-plan/src/client/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ pub(super) fn assert_cloud_eligible(dsl: &DslPlan) -> PolarsResult<()> {
} => {
let sources_lock = sources.lock().unwrap();
match &sources_lock.sources {
ScanSources::Files(paths) => {
ScanSources::Paths(paths) => {
if paths.iter().any(|p| !is_cloud_url(p)) {
return ineligible_error("contains scan of local file system");
}
},
ScanSources::Files(_) => {
return ineligible_error("contains scan of opened files");
},
ScanSources::Buffers(_) => {
return ineligible_error("contains scan of in-memory buffer");
},
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ impl DslScanSources {
return Ok(());
}

let ScanSources::Files(paths) = &self.sources else {
let ScanSources::Paths(paths) = &self.sources else {
self.is_expanded = true;
return Ok(());
};
Expand All @@ -920,7 +920,7 @@ impl DslScanSources {

#[allow(unreachable_code)]
{
self.sources = ScanSources::Files(expanded_sources);
self.sources = ScanSources::Paths(expanded_sources);
self.is_expanded = true;

Ok(())
Expand Down
Loading

0 comments on commit fbdbe4c

Please sign in to comment.