diff --git a/crates/polars-io/src/cloud/glob.rs b/crates/polars-io/src/cloud/glob.rs index c55d7767f39f..9ad4595faa53 100644 --- a/crates/polars-io/src/cloud/glob.rs +++ b/crates/polars-io/src/cloud/glob.rs @@ -1,8 +1,9 @@ use futures::TryStreamExt; use object_store::path::Path; use polars_core::error::to_compute_err; -use polars_core::prelude::{polars_ensure, polars_err}; -use polars_error::PolarsResult; +use polars_core::prelude::polars_ensure; +use polars_error::{polars_bail, PolarsResult}; +use polars_utils::format_pl_smallstr; use polars_utils::pl_str::PlSmallStr; use regex::Regex; use url::Url; @@ -98,13 +99,16 @@ impl CloudLocation { } let key = parsed.path(); - let bucket = parsed - .host() - .ok_or_else( - || polars_err!(ComputeError: "cannot parse bucket (host) from url: {}", parsed), - )? - .to_string() - .into(); + + let bucket = format_pl_smallstr!( + "{}", + &parsed[url::Position::BeforeUsername..url::Position::AfterPort] + ); + + if bucket.is_empty() { + polars_bail!(ComputeError: "CloudLocation::from_url(): empty bucket: {}", parsed); + } + (bucket, key) }; diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index 22e666a8198b..f7b47412f167 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -7,6 +7,7 @@ use polars_core::config; use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult}; use polars_utils::aliases::PlHashMap; use polars_utils::pl_str::PlSmallStr; +use polars_utils::{format_pl_smallstr, pl_serialize}; use tokio::sync::RwLock; use url::Url; @@ -17,7 +18,7 @@ use crate::cloud::CloudConfig; /// get rate limited when querying the DNS (can take up to 5s). /// Other reasons are connection pools that must be shared between as much as possible. #[allow(clippy::type_complexity)] -static OBJECT_STORE_CACHE: Lazy>> = +static OBJECT_STORE_CACHE: Lazy, PolarsObjectStore>>> = Lazy::new(Default::default); #[allow(dead_code)] @@ -29,10 +30,10 @@ fn err_missing_feature(feature: &str, scheme: &str) -> PolarsResult) -> String { +fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> Vec { #[derive(Clone, Debug, PartialEq, Hash, Eq)] - #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] - struct S { + #[cfg_attr(feature = "serde", derive(serde::Serialize))] + struct C { max_retries: usize, #[cfg(feature = "file_cache")] file_cache_ttl: u64, @@ -41,8 +42,15 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { credential_provider: usize, } + #[derive(Clone, Debug, PartialEq, Hash, Eq)] + #[cfg_attr(feature = "serde", derive(serde::Serialize))] + struct S { + url_base: PlSmallStr, + cloud_options: Option, + } + // We include credentials as they can expire, so users will send new credentials for the same url. - let creds = serde_json::to_string(&options.map( + let cloud_options = options.map( |CloudOptions { // Destructure to ensure this breaks if anything changes. max_retries, @@ -52,7 +60,7 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { #[cfg(feature = "cloud")] credential_provider, }| { - S { + C { max_retries: *max_retries, #[cfg(feature = "file_cache")] file_cache_ttl: *file_cache_ttl, @@ -61,15 +69,21 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()), } }, - )) - .unwrap(); - - format!( - "{}://{}<\\creds\\>{}", - url.scheme(), - &url[url::Position::BeforeHost..url::Position::AfterPort], - creds - ) + ); + + let cache_key = S { + url_base: format_pl_smallstr!( + "{}", + &url[url::Position::BeforeScheme..url::Position::AfterPort] + ), + cloud_options, + }; + + if config::verbose() { + eprintln!("object store cache key: {} {:?}", url, &cache_key); + } + + pl_serialize::serialize_to_bytes(&cache_key).unwrap() } /// Construct an object_store `Path` from a string without any encoding/decoding. diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index de5f8903eabd..4159671204f8 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -25,8 +25,6 @@ use polars_error::*; #[cfg(feature = "aws")] use polars_utils::cache::FastFixedCache; #[cfg(feature = "aws")] -use polars_utils::pl_str::PlSmallStr; -#[cfg(feature = "aws")] use regex::Regex; #[cfg(feature = "http")] use reqwest::header::HeaderMap; @@ -43,8 +41,11 @@ use crate::file_cache::get_env_file_cache_ttl; use crate::pl_async::with_concurrency_budget; #[cfg(feature = "aws")] -static BUCKET_REGION: Lazy>> = - Lazy::new(|| std::sync::Mutex::new(FastFixedCache::new(32))); +static BUCKET_REGION: Lazy< + std::sync::Mutex< + FastFixedCache, + >, +> = Lazy::new(|| std::sync::Mutex::new(FastFixedCache::new(32))); /// The type of the config keys must satisfy the following requirements: /// 1. must be easily collected into a HashMap, the type required by the object_crate API. @@ -406,16 +407,20 @@ impl CloudOptions { pub fn build_azure(&self, url: &str) -> PolarsResult { use super::credential_provider::IntoCredentialProvider; - let mut builder = if self.credential_provider.is_none() { - MicrosoftAzureBuilder::from_env() - } else { - MicrosoftAzureBuilder::new() - }; + let mut storage_account: Option = None; + + // The credential provider `self.credentials` is prioritized if it is set. We also need + // `from_env()` as it may source environment configured storage account name. + let mut builder = MicrosoftAzureBuilder::from_env(); + if let Some(options) = &self.config { let CloudConfig::Azure(options) = options else { panic!("impl error: cloud type mismatch") }; for (key, value) in options.iter() { + if key == &AzureConfigKey::AccountName { + storage_account = Some(value.into()); + } builder = builder.with_config(*key, value); } } @@ -425,8 +430,18 @@ impl CloudOptions { .with_url(url) .with_retry(get_retry_config(self.max_retries)); + // Prefer the one embedded in the path + storage_account = extract_adls_uri_storage_account(url) + .map(|x| x.into()) + .or(storage_account); + let builder = if let Some(v) = self.credential_provider.clone() { builder.with_credentials(v.into_azure_provider()) + } else if let Some(v) = storage_account + .as_deref() + .and_then(get_azure_storage_account_key) + { + builder.with_access_key(v) } else { builder }; @@ -610,6 +625,99 @@ impl CloudOptions { } } +/// ```text +/// "abfss://{CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net/" +/// ^^^^^^^^^^^^^^^^^ +/// ``` +#[cfg(feature = "azure")] +fn extract_adls_uri_storage_account(path: &str) -> Option<&str> { + Some( + path.split_once("://")? + .1 + .split_once('/')? + .0 + .split_once('@')? + .1 + .split_once(".dfs.core.windows.net")? + .0, + ) +} + +/// Attempt to retrieve the storage account key for this account using the Azure CLI. +#[cfg(feature = "azure")] +fn get_azure_storage_account_key(account_name: &str) -> Option { + if polars_core::config::verbose() { + eprintln!( + "get_azure_storage_account_key: storage_account_name: {}", + account_name + ); + } + + let mut cmd = if cfg!(target_family = "windows") { + // https://github.com/apache/arrow-rs/blob/565c24b8071269b02c3937e34c51eacf0f4cbad6/object_store/src/azure/credential.rs#L877-L894 + let mut v = std::process::Command::new("cmd"); + v.args([ + "/C", + "az", + "storage", + "account", + "keys", + "list", + "--output", + "json", + "--account-name", + account_name, + ]); + v + } else { + let mut v = std::process::Command::new("az"); + v.args([ + "storage", + "account", + "keys", + "list", + "--output", + "json", + "--account-name", + account_name, + ]); + v + }; + + let json_resp = cmd + .output() + .ok() + .filter(|x| x.status.success()) + .map(|x| String::from_utf8(x.stdout))? + .ok()?; + + // [ + // { + // "creationTime": "1970-01-01T00:00:00.000000+00:00", + // "keyName": "key1", + // "permissions": "FULL", + // "value": "..." + // }, + // { + // "creationTime": "1970-01-01T00:00:00.000000+00:00", + // "keyName": "key2", + // "permissions": "FULL", + // "value": "..." + // } + // ] + + #[derive(Debug, serde::Deserialize)] + struct S { + value: String, + } + + let resp: Vec = serde_json::from_str(&json_resp).ok()?; + + let access_key = resp.into_iter().next()?.value; + + Some(access_key) +} + #[cfg(feature = "cloud")] #[cfg(test)] mod tests { diff --git a/crates/polars-io/src/file_cache/entry.rs b/crates/polars-io/src/file_cache/entry.rs index 137d321d6a70..63831554e86e 100644 --- a/crates/polars-io/src/file_cache/entry.rs +++ b/crates/polars-io/src/file_cache/entry.rs @@ -4,6 +4,7 @@ use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex}; use fs4::fs_std::FileExt; +use once_cell::sync::Lazy; use polars_core::config; use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult}; @@ -151,13 +152,38 @@ impl Inner { .truncate(true) .open(data_file_path) .map_err(PolarsError::from)?; + + static IGNORE_ERR: Lazy = Lazy::new(|| { + let v = + std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() == Ok("1"); + if config::verbose() { + eprintln!( + "[file_cache]: POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR: {}", + v + ); + } + v + }); + + // Initialize it to get the verbose print + let _ = *IGNORE_ERR; + file.lock_exclusive().unwrap(); - if file.allocate(remote_metadata.size).is_err() { - polars_bail!( - ComputeError: "failed to allocate {} bytes to download uri = {}", + if let Err(e) = file.allocate(remote_metadata.size) { + let msg = format!( + "failed to reserve {} bytes on disk to download uri = {}: {:?}", remote_metadata.size, - self.uri.as_ref() + self.uri.as_ref(), + e ); + + if *IGNORE_ERR { + if config::verbose() { + eprintln!("[file_cache]: warning: {}", msg) + } + } else { + polars_bail!(ComputeError: msg); + } } } self.file_fetcher.fetch(data_file_path)?; diff --git a/crates/polars-ops/src/frame/join/args.rs b/crates/polars-ops/src/frame/join/args.rs index 1b124a909e0b..e2df38daf88b 100644 --- a/crates/polars-ops/src/frame/join/args.rs +++ b/crates/polars-ops/src/frame/join/args.rs @@ -81,6 +81,18 @@ pub enum MaintainOrderJoin { RightLeft, } +impl MaintainOrderJoin { + pub(super) fn flip(&self) -> Self { + match self { + MaintainOrderJoin::None => MaintainOrderJoin::None, + MaintainOrderJoin::Left => MaintainOrderJoin::Right, + MaintainOrderJoin::Right => MaintainOrderJoin::Left, + MaintainOrderJoin::LeftRight => MaintainOrderJoin::RightLeft, + MaintainOrderJoin::RightLeft => MaintainOrderJoin::LeftRight, + } + } +} + impl Default for JoinArgs { fn default() -> Self { Self { diff --git a/crates/polars-ops/src/frame/join/dispatch_left_right.rs b/crates/polars-ops/src/frame/join/dispatch_left_right.rs index 577e1dd353eb..452c51154a64 100644 --- a/crates/polars-ops/src/frame/join/dispatch_left_right.rs +++ b/crates/polars-ops/src/frame/join/dispatch_left_right.rs @@ -21,11 +21,12 @@ pub(super) fn right_join_from_series( right: DataFrame, s_left: &Series, s_right: &Series, - args: JoinArgs, + mut args: JoinArgs, verbose: bool, drop_names: Option>, ) -> PolarsResult { // Swap the order of tables to do a right join. + args.maintain_order = args.maintain_order.flip(); let (df_right, df_left) = materialize_left_join_from_series( right, left, s_right, s_left, &args, verbose, drop_names, )?; diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 34f03e6debdd..0ad4bb6bb0ee 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -163,21 +163,19 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult let sources = match &scan_type { #[cfg(feature = "parquet")] - FileScan::Parquet { - ref cloud_options, .. - } => sources + FileScan::Parquet { cloud_options, .. } => sources .expand_paths_with_hive_update(&mut file_options, cloud_options.as_ref())?, #[cfg(feature = "ipc")] - FileScan::Ipc { - ref cloud_options, .. - } => sources + FileScan::Ipc { cloud_options, .. } => sources .expand_paths_with_hive_update(&mut file_options, cloud_options.as_ref())?, #[cfg(feature = "csv")] - FileScan::Csv { - ref cloud_options, .. - } => sources.expand_paths(&file_options, cloud_options.as_ref())?, + FileScan::Csv { cloud_options, .. } => { + sources.expand_paths(&file_options, cloud_options.as_ref())? + }, #[cfg(feature = "json")] - FileScan::NDJson { .. } => sources.expand_paths(&file_options, None)?, + FileScan::NDJson { cloud_options, .. } => { + sources.expand_paths(&file_options, cloud_options.as_ref())? + }, FileScan::Anonymous { .. } => sources, }; diff --git a/crates/polars-stream/src/nodes/joins/equi_join.rs b/crates/polars-stream/src/nodes/joins/equi_join.rs index 8afc7c88b167..c0c072d51d8b 100644 --- a/crates/polars-stream/src/nodes/joins/equi_join.rs +++ b/crates/polars-stream/src/nodes/joins/equi_join.rs @@ -202,18 +202,29 @@ impl BuildState { } fn finalize(&mut self, params: &EquiJoinParams, table: &dyn ChunkedIdxTable) -> ProbeState { - let num_partitions = self.partitions_per_worker.len(); + // Transpose. + let num_workers = self.partitions_per_worker.len(); + let num_partitions = self.partitions_per_worker[0].len(); + let mut results_per_partition = (0..num_partitions) + .map(|_| Vec::with_capacity(num_workers)) + .collect_vec(); + for worker in self.partitions_per_worker.drain(..) { + for (p, result) in worker.into_iter().enumerate() { + results_per_partition[p].push(result); + } + } + let track_unmatchable = params.emit_unmatched_build(); - let table_per_partition: Vec<_> = (0..num_partitions) + let table_per_partition: Vec<_> = results_per_partition .into_par_iter() .with_max_len(1) - .map(|p| { + .map(|results| { // Estimate sizes and cardinality. let mut sketch = CardinalitySketch::new(); let mut num_frames = 0; - for worker in &self.partitions_per_worker { - sketch.combine(worker[p].sketch.as_ref().unwrap()); - num_frames += worker[p].frames.len(); + for result in &results { + sketch.combine(result.sketch.as_ref().unwrap()); + num_frames += result.frames.len(); } // Build table for this partition. @@ -223,9 +234,9 @@ impl BuildState { table.reserve(sketch.estimate() * 5 / 4); if params.preserve_order_build { let mut combined = Vec::with_capacity(num_frames); - for worker in &self.partitions_per_worker { + for result in results { for (hash_keys, (seq, frame)) in - worker[p].hash_keys.iter().zip(&worker[p].frames) + result.hash_keys.into_iter().zip(result.frames) { combined.push((seq, hash_keys, frame)); } @@ -239,14 +250,14 @@ impl BuildState { continue; } - table.insert_key_chunk(hash_keys.clone(), track_unmatchable); - combined_frames.push(frame.clone()); - chunk_seq_ids.push(*seq); + table.insert_key_chunk(hash_keys, track_unmatchable); + combined_frames.push(frame); + chunk_seq_ids.push(seq); } } else { - for worker in &self.partitions_per_worker { + for result in results { for (hash_keys, (_, frame)) in - worker[p].hash_keys.iter().zip(&worker[p].frames) + result.hash_keys.into_iter().zip(result.frames) { // Zero-sized chunks can get deleted, so skip entirely to avoid messing // up the chunk counter. @@ -254,8 +265,8 @@ impl BuildState { continue; } - table.insert_key_chunk(hash_keys.clone(), track_unmatchable); - combined_frames.push(frame.clone()); + table.insert_key_chunk(hash_keys, track_unmatchable); + combined_frames.push(frame); } } } diff --git a/py-polars/polars/_utils/various.py b/py-polars/polars/_utils/various.py index 3f66c4a26a65..4f72d8947765 100644 --- a/py-polars/polars/_utils/various.py +++ b/py-polars/polars/_utils/various.py @@ -297,6 +297,8 @@ def _cast_repr_strings_with_schema( msg = f"DataFrame should contain only String repr data; found {tp!r}" raise TypeError(msg) + special_floats = {"-inf", "+inf", "inf", "nan"} + # duration string scaling ns_sec = 1_000_000_000 duration_scaling = { @@ -366,8 +368,11 @@ def str_duration_(td: str | None) -> int | None: integer_part = F.col(c).str.replace(r"^(.*)\D(\d*)$", "$1") fractional_part = F.col(c).str.replace(r"^(.*)\D(\d*)$", "$2") cast_cols[c] = ( - # check for empty string and/or integer format - pl.when(F.col(c).str.contains(r"^[+-]?\d*$")) + # check for empty string, special floats, or integer format + pl.when( + F.col(c).str.contains(r"^[+-]?\d*$") + | F.col(c).str.to_lowercase().is_in(special_floats) + ) .then(pl.when(F.col(c).str.len_bytes() > 0).then(F.col(c))) # check for scientific notation .when(F.col(c).str.contains("[eE]")) diff --git a/py-polars/polars/convert/general.py b/py-polars/polars/convert/general.py index 13256d925c18..7eb492ee9d13 100644 --- a/py-polars/polars/convert/general.py +++ b/py-polars/polars/convert/general.py @@ -760,7 +760,12 @@ def _from_dataframe_repr(m: re.Match[str]) -> DataFrame: buf = io.BytesIO() df.write_csv(file=buf) buf.seek(0) - df = read_csv(buf, new_columns=df.columns, try_parse_dates=True) + df = read_csv( + buf, + new_columns=df.columns, + try_parse_dates=True, + infer_schema_length=None, + ) return df elif schema and not data: return df.cast(schema) # type: ignore[arg-type] @@ -786,8 +791,16 @@ def _from_series_repr(m: re.Match[str]) -> Series: ] if string_values == ["[", "]"]: string_values = [] - elif string_values and string_values[0].lstrip("#> ") == "[": - string_values = string_values[1:] + else: + start: int | None = None + end: int | None = None + for idx, v in enumerate(string_values): + if start is None and v.lstrip("#> ") == "[": + start = idx + if v.lstrip("#> ") == "]": + end = idx + if start is not None and end is not None: + string_values = string_values[start + 1 : end] values = string_values[:length] if length > 0 else string_values values = [(None if v == "null" else v) for v in values if v not in ("…", "...")] diff --git a/py-polars/tests/unit/interop/test_interop.py b/py-polars/tests/unit/interop/test_interop.py index 2b3e11514404..90d3ab4a85c9 100644 --- a/py-polars/tests/unit/interop/test_interop.py +++ b/py-polars/tests/unit/interop/test_interop.py @@ -616,6 +616,33 @@ def test_series_from_repr() -> None: ) assert_series_equal(s, pl.Series("flt", [], dtype=pl.Float32)) + s = cast( + pl.Series, + pl.from_repr( + """ + Series: 'flt' [f64] + [ + null + +inf + -inf + inf + 0.0 + NaN + ] + >>> print("stuff") + """ + ), + ) + inf, nan = float("inf"), float("nan") + assert_series_equal( + s, + pl.Series( + name="flt", + dtype=pl.Float64, + values=[None, inf, -inf, inf, 0.0, nan], + ), + ) + def test_dataframe_from_repr_custom_separators() -> None: # repr created with custom digit-grouping diff --git a/py-polars/tests/unit/operations/test_join.py b/py-polars/tests/unit/operations/test_join.py index c5a01c5ef603..9a3b763b5f6f 100644 --- a/py-polars/tests/unit/operations/test_join.py +++ b/py-polars/tests/unit/operations/test_join.py @@ -1243,6 +1243,20 @@ def test_join_preserve_order_left() -> None: 5, ] + right_left = left.join(right, on="a", how="right", maintain_order="left").collect() + assert right_left.get_column("a").cast(pl.UInt32).to_list() == [2, 1, 1, None, 6] + + right_right = left.join( + right, on="a", how="right", maintain_order="right" + ).collect() + assert right_right.get_column("a").cast(pl.UInt32).to_list() == [ + 1, + 1, + None, + 2, + 6, + ] + def test_join_preserve_order_full() -> None: left = pl.LazyFrame({"a": [None, 2, 1, 1, 5]})