diff --git a/Cargo.lock b/Cargo.lock index 4f105dc1b4968..ca5aba8839a8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2022,6 +2022,7 @@ dependencies = [ "arrow", "async-trait", "bytes", + "criterion", "datafusion-common", "datafusion-common-runtime", "datafusion-datasource", diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 37fa8d43a0816..c4a75e4eb880b 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -46,6 +46,9 @@ futures = { workspace = true } object_store = { workspace = true } tokio = { workspace = true } +[dev-dependencies] +criterion = { workspace = true } + # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet # https://github.com/rust-lang/cargo/issues/13157 @@ -55,3 +58,7 @@ workspace = true [lib] name = "datafusion_datasource_json" path = "src/mod.rs" + +[[bench]] +name = "json_boundary" +harness = false diff --git a/datafusion/datasource-json/benches/json_boundary.rs b/datafusion/datasource-json/benches/json_boundary.rs new file mode 100644 index 0000000000000..6bfd329588a0a --- /dev/null +++ b/datafusion/datasource-json/benches/json_boundary.rs @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema}; +use async_trait::async_trait; +use bytes::Bytes; +use criterion::{Criterion, Throughput, criterion_group, criterion_main}; +use datafusion_datasource::file::FileSource; +use datafusion_datasource::file_groups::FileGroup; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{FileRange, PartitionedFile, TableSchema}; +use datafusion_datasource_json::source::JsonSource; +use datafusion_execution::TaskContext; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_physical_plan::ExecutionPlan; +use futures::StreamExt; +use futures::stream::BoxStream; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ + GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectStore, + PutMultipartOptions, PutOptions, PutPayload, PutResult, +}; +use std::fmt; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use tokio::runtime::{Builder, Runtime}; + +// Add CPU cost per requested KB to make read amplification visible in timings. +const CPU_COST_PER_KB_ROUNDS: u32 = 64; +const BYTES_PER_KB: u64 = 1024; + +#[derive(Debug)] +struct CountingObjectStore { + inner: Arc, + requested_bytes: AtomicU64, + cpu_cost_per_kb_rounds: u32, +} + +impl CountingObjectStore { + fn new_with_cpu_cost( + inner: Arc, + cpu_cost_per_kb_rounds: u32, + ) -> Self { + Self { + inner, + requested_bytes: AtomicU64::new(0), + cpu_cost_per_kb_rounds, + } + } + + fn reset(&self) { + self.requested_bytes.store(0, Ordering::Relaxed); + } + + fn requested_bytes(&self) -> u64 { + self.requested_bytes.load(Ordering::Relaxed) + } +} + +impl fmt::Display for CountingObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CountingObjectStore({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for CountingObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> object_store::Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> object_store::Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + let should_burn_cpu = self.cpu_cost_per_kb_rounds > 0; + let mut requested_len = 0u64; + if let Some(range) = options.range.as_ref() { + let requested = match range { + GetRange::Bounded(r) => r.end.saturating_sub(r.start), + GetRange::Offset(_) | GetRange::Suffix(_) => 0, + }; + requested_len = requested; + self.requested_bytes.fetch_add(requested, Ordering::Relaxed); + } + let result = self.inner.get_opts(location, options).await; + if should_burn_cpu { + burn_cpu_kb(requested_len, self.cpu_cost_per_kb_rounds); + } + result + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.inner.delete(location).await + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, object_store::Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> object_store::Result<()> { + self.inner.copy_if_not_exists(from, to).await + } +} + +fn build_fixed_json_lines(line_len: usize, lines: usize) -> Bytes { + let prefix = r#"{"value":""#; + let suffix = "\"}\n"; + let min_len = prefix.len() + suffix.len() + 1; + assert!(line_len >= min_len, "line_len must be at least {min_len}"); + + let padding_len = line_len - prefix.len() - suffix.len(); + let mut line = Vec::with_capacity(line_len); + line.extend_from_slice(prefix.as_bytes()); + line.extend(std::iter::repeat_n(b'a', padding_len)); + line.extend_from_slice(suffix.as_bytes()); + + let mut data = Vec::with_capacity(line_len * lines); + for _ in 0..lines { + data.extend_from_slice(&line); + } + Bytes::from(data) +} + +fn burn_cpu_kb(bytes: u64, rounds: u32) { + if bytes == 0 || rounds == 0 { + return; + } + let kb = bytes.div_ceil(BYTES_PER_KB); + let mut checksum = 0u64; + let mut remaining = kb.saturating_mul(rounds as u64); + while remaining > 0 { + checksum = checksum.wrapping_add(remaining); + checksum = checksum.rotate_left(5) ^ 0x9e3779b97f4a7c15; + remaining -= 1; + } + std::hint::black_box(checksum); +} + +struct Fixture { + store: Arc, + task_ctx: Arc, + exec: Arc, +} + +fn build_fixture(rt: &Runtime) -> Fixture { + let inner: Arc = Arc::new(InMemory::new()); + let store = Arc::new(CountingObjectStore::new_with_cpu_cost( + Arc::clone(&inner), + CPU_COST_PER_KB_ROUNDS, + )); + let store_dyn: Arc = store.clone(); + let path = Path::from("bench.json"); + + let line_len = 128usize; + let lines = 65_536usize; + let data = build_fixed_json_lines(line_len, lines); + rt.block_on(inner.put(&path, data.into())).unwrap(); + let object_meta = rt.block_on(inner.head(&path)).unwrap(); + + let start = 1_000_003usize; + let raw_end = start + 256_000; + let end = (raw_end / line_len).max(1) * line_len; + + let task_ctx = Arc::new(TaskContext::default()); + let runtime_env = task_ctx.runtime_env(); + let object_store_url = ObjectStoreUrl::parse("test://bucket").unwrap(); + // Register a CPU-costed store to approximate non-streaming remote reads. + runtime_env.register_object_store(object_store_url.as_ref(), Arc::clone(&store_dyn)); + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Utf8, + false, + )])); + let table_schema = TableSchema::from_file_schema(schema); + let file_source: Arc = Arc::new(JsonSource::new(table_schema)); + let file = build_partitioned_file(object_meta.clone(), start, end); + let config = FileScanConfigBuilder::new(object_store_url, file_source) + .with_file_groups(vec![FileGroup::new(vec![file])]) + .build(); + let exec: Arc = DataSourceExec::from_data_source(config); + + Fixture { + store, + task_ctx, + exec, + } +} + +fn measure_datasource_exec_bytes(rt: &Runtime, fixture: &Fixture) -> u64 { + fixture.store.reset(); + let rows = rt.block_on(run_datasource_exec( + Arc::clone(&fixture.exec), + Arc::clone(&fixture.task_ctx), + )); + debug_assert!(rows > 0); + fixture.store.requested_bytes() +} + +fn build_partitioned_file( + object_meta: object_store::ObjectMeta, + start: usize, + end: usize, +) -> PartitionedFile { + PartitionedFile { + object_meta, + partition_values: vec![], + range: Some(FileRange { + start: start as i64, + end: end as i64, + }), + statistics: None, + ordering: None, + extensions: None, + metadata_size_hint: None, + } +} + +async fn run_datasource_exec( + exec: Arc, + task_ctx: Arc, +) -> usize { + let mut stream = exec.execute(0, task_ctx).unwrap(); + let mut rows = 0; + while let Some(batch) = stream.next().await { + let batch = batch.unwrap(); + rows += batch.num_rows(); + } + rows +} + +fn bench_json_boundary(c: &mut Criterion) { + let rt = Builder::new_current_thread().build().unwrap(); + let fixture = build_fixture(&rt); + + let exec_bytes = measure_datasource_exec_bytes(&rt, &fixture); + + let mut exec_group = c.benchmark_group("json_boundary_datasource_exec"); + exec_group.throughput(Throughput::Bytes(exec_bytes)); + // Fixed benchmark id for baseline comparisons; read_bytes is reported as throughput. + exec_group.bench_function("execute", |b| { + b.iter(|| { + fixture.store.reset(); + rt.block_on(run_datasource_exec( + Arc::clone(&fixture.exec), + Arc::clone(&fixture.task_ctx), + )); + }); + }); + exec_group.finish(); +} + +criterion_group!(benches, bench_json_boundary); +criterion_main!(benches); diff --git a/datafusion/datasource-json/src/boundary_utils.rs b/datafusion/datasource-json/src/boundary_utils.rs new file mode 100644 index 0000000000000..1184de9c684f6 --- /dev/null +++ b/datafusion/datasource-json/src/boundary_utils.rs @@ -0,0 +1,484 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Bytes; +use datafusion_common::{DataFusionError, Result}; +use object_store::{ObjectStore, path::Path}; +use std::sync::Arc; + +pub const DEFAULT_BOUNDARY_WINDOW: u64 = 4096; // 4KB + +/// Fetch bytes for [start, end) and align boundaries in memory. +/// +/// Start alignment: +/// - If start == 0, use bytes as-is. +/// - Else, check byte at start-1 (included in fetch). If it is the terminator, +/// start from `start`. Otherwise scan forward in memory for the first terminator +/// and start after it. If no terminator exists in the fetched range, return None. +/// +/// End alignment: +/// - If the last byte is not the terminator and end < file_size, fetch forward in +/// chunks until the terminator is found or EOF is reached. +pub async fn get_aligned_bytes( + store: &Arc, + location: &Path, + start: u64, + end: u64, + file_size: u64, + terminator: u8, + scan_window: u64, +) -> Result> { + if scan_window == 0 { + return Err(DataFusionError::Internal( + "scan_window must be greater than 0".to_string(), + )); + } + + if start >= end || start >= file_size { + return Ok(None); + } + + let fetch_start = start.saturating_sub(1); + let fetch_end = std::cmp::min(end, file_size); + let bytes = store + .get_range(location, fetch_start..fetch_end) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + if bytes.is_empty() { + return Ok(None); + } + + let data_offset = if start == 0 { + 0 + } else if bytes[0] == terminator { + 1 + } else { + match bytes[1..].iter().position(|&b| b == terminator) { + Some(pos) => pos + 2, + None => return Ok(None), + } + }; + + if data_offset >= bytes.len() { + return Ok(None); + } + + let data = bytes.slice(data_offset..); + + // Fast path: if already aligned, return zero-copy + if fetch_end >= file_size || data.last() == Some(&terminator) { + return Ok(Some(data)); + } + + // Slow path: need to extend, preallocate capacity + let scan_window_usize = usize::try_from(scan_window).map_err(|_| { + DataFusionError::Internal("scan_window must fit in usize".to_string()) + })?; + let mut buffer = Vec::with_capacity(data.len().saturating_add(scan_window_usize)); + buffer.extend_from_slice(&data); + let mut cursor = fetch_end; + + while cursor < file_size { + let chunk_end = std::cmp::min(cursor.saturating_add(scan_window), file_size); + let chunk = store + .get_range(location, cursor..chunk_end) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + if chunk.is_empty() { + break; + } + + if let Some(pos) = chunk.iter().position(|&b| b == terminator) { + buffer.extend_from_slice(&chunk[..=pos]); + return Ok(Some(Bytes::from(buffer))); + } + + buffer.extend_from_slice(&chunk); + cursor = chunk_end; + } + + Ok(Some(Bytes::from(buffer))) +} + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use datafusion_datasource::{FileRange, PartitionedFile, calculate_range}; + use futures::stream::BoxStream; + use object_store::memory::InMemory; + use object_store::{ + GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectStore, + PutMultipartOptions, PutOptions, PutPayload, PutResult, + }; + use std::fmt; + use std::sync::atomic::{AtomicU64, Ordering}; + + #[tokio::test] + async fn test_get_aligned_bytes_start_at_beginning() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store + .put(&path, "line1\nline2\nline3\n".into()) + .await + .unwrap(); + + let result = get_aligned_bytes(&store, &path, 0, 6, 18, b'\n', 4096) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"line1\n"); + } + + #[tokio::test] + async fn test_get_aligned_bytes_start_aligned() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + // "line1\nline2\nline3\n" + // Position 6 is right after first \n + store + .put(&path, "line1\nline2\nline3\n".into()) + .await + .unwrap(); + + let result = get_aligned_bytes(&store, &path, 6, 12, 18, b'\n', 4096) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"line2\n"); + } + + #[tokio::test] + async fn test_get_aligned_bytes_start_needs_alignment() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + // "line1\nline2\nline3\n" + // Position 8 is in the middle of "line2" + store + .put(&path, "line1\nline2\nline3\n".into()) + .await + .unwrap(); + + let result = get_aligned_bytes(&store, &path, 8, 18, 18, b'\n', 4096) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"line3\n"); + } + + #[tokio::test] + async fn test_get_aligned_bytes_no_newline_in_range() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store.put(&path, "abcdefghij\n".into()).await.unwrap(); + + let result = get_aligned_bytes(&store, &path, 2, 8, 11, b'\n', 4096) + .await + .unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_get_aligned_bytes_extend_end() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + // "line1\nline2\nline3\n" + store + .put(&path, "line1\nline2\nline3\n".into()) + .await + .unwrap(); + + let result = get_aligned_bytes(&store, &path, 0, 8, 18, b'\n', 2) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"line1\nline2\n"); + } + + #[tokio::test] + async fn test_get_aligned_bytes_end_at_eof_without_newline() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store.put(&path, "line1".into()).await.unwrap(); + + let result = get_aligned_bytes(&store, &path, 0, 5, 5, b'\n', 4) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"line1"); + } + + #[tokio::test] + async fn test_get_aligned_bytes_rejects_zero_scan_window() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store.put(&path, "line1\n".into()).await.unwrap(); + + let err = get_aligned_bytes(&store, &path, 0, 6, 6, b'\n', 0) + .await + .unwrap_err(); + + assert!( + matches!(err, DataFusionError::Internal(ref msg) if msg.contains("scan_window")), + "unexpected error: {err}" + ); + } + + #[derive(Debug)] + struct CountingObjectStore { + inner: Arc, + requested_bytes: AtomicU64, + requested_calls: AtomicU64, + } + + impl CountingObjectStore { + fn new(inner: Arc) -> Self { + Self { + inner, + requested_bytes: AtomicU64::new(0), + requested_calls: AtomicU64::new(0), + } + } + + fn reset(&self) { + self.requested_bytes.store(0, Ordering::Relaxed); + self.requested_calls.store(0, Ordering::Relaxed); + } + + fn requested_bytes(&self) -> u64 { + self.requested_bytes.load(Ordering::Relaxed) + } + } + + impl fmt::Display for CountingObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CountingObjectStore({})", self.inner) + } + } + + #[async_trait] + impl ObjectStore for CountingObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> object_store::Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> object_store::Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + if let Some(range) = options.range.as_ref() { + let requested = match range { + GetRange::Bounded(r) => r.end.saturating_sub(r.start), + GetRange::Offset(_) | GetRange::Suffix(_) => 0, + }; + self.requested_bytes.fetch_add(requested, Ordering::Relaxed); + } + self.requested_calls.fetch_add(1, Ordering::Relaxed); + self.inner.get_opts(location, options).await + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.inner.delete(location).await + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, object_store::Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> object_store::Result<()> { + self.inner.copy_if_not_exists(from, to).await + } + } + + fn build_fixed_lines(line_len: usize, lines: usize) -> Bytes { + let body_len = line_len.saturating_sub(1); + let mut data = Vec::with_capacity(line_len * lines); + for _ in 0..lines { + data.extend(std::iter::repeat_n(b'a', body_len)); + data.push(b'\n'); + } + Bytes::from(data) + } + + #[tokio::test] + async fn test_get_aligned_bytes_start_equals_end() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store.put(&path, "line1\n".into()).await.unwrap(); + + let result = get_aligned_bytes(&store, &path, 3, 3, 6, b'\n', 4096) + .await + .unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_get_aligned_bytes_start_beyond_file_size() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + store.put(&path, "line1\n".into()).await.unwrap(); + + let result = get_aligned_bytes(&store, &path, 10, 20, 6, b'\n', 4096) + .await + .unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_get_aligned_bytes_multi_window_extension() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + // Create a line longer than scan_window (use scan_window=4) + // "aaaaaaaaaa\n" = 11 bytes, need multiple 4-byte windows to find newline + store.put(&path, "aaaaaaaaaa\nbbbb\n".into()).await.unwrap(); + + // Request [0, 5), end is in the middle of line, need to extend + // with scan_window=4, need 2 extensions to reach position 10 (the newline) + let result = get_aligned_bytes(&store, &path, 0, 5, 16, b'\n', 4) + .await + .unwrap(); + + assert_eq!(result.unwrap().as_ref(), b"aaaaaaaaaa\n"); + } + + #[tokio::test] + async fn test_get_aligned_bytes_partitions_complete_coverage() { + let store: Arc = Arc::new(InMemory::new()); + let path = Path::from("test.json"); + + // 5 lines, each 10 bytes = 50 bytes total + let content = "aaaaaaaaa\nbbbbbbbbb\nccccccccc\nddddddddd\neeeeeeeee\n"; + store.put(&path, content.into()).await.unwrap(); + let file_size = content.len() as u64; + + // Split at arbitrary boundaries: [0, 15), [15, 35), [35, 50) + let boundaries = vec![0u64, 15, 35, file_size]; + let mut combined = Vec::new(); + + for window in boundaries.windows(2) { + let (start, end) = (window[0], window[1]); + let bytes = + get_aligned_bytes(&store, &path, start, end, file_size, b'\n', 4096) + .await + .unwrap() + .unwrap(); + combined.extend_from_slice(&bytes); + } + + assert_eq!(combined, content.as_bytes()); + } + + #[tokio::test] + async fn test_get_aligned_bytes_reduces_requested_bytes() { + let inner: Arc = Arc::new(InMemory::new()); + let store = Arc::new(CountingObjectStore::new(Arc::clone(&inner))); + let store_dyn: Arc = store.clone(); + let path = Path::from("amplification.json"); + + let data = build_fixed_lines(128, 16_384); + let file_size = data.len() as u64; + inner.put(&path, data.into()).await.unwrap(); + + let start = 1_000_003u64; + let raw_end = start + 64_000; + let end = (raw_end / 128).max(1) * 128; + + let object_meta = inner.head(&path).await.unwrap(); + let file = PartitionedFile { + object_meta, + partition_values: vec![], + range: Some(FileRange { + start: i64::try_from(start).unwrap(), + end: i64::try_from(end).unwrap(), + }), + statistics: None, + ordering: None, + extensions: None, + metadata_size_hint: None, + }; + + store.reset(); + let _ = calculate_range(&file, &store_dyn, None).await.unwrap(); + let old_bytes = store.requested_bytes(); + + store.reset(); + let _ = get_aligned_bytes( + &store_dyn, + &path, + start, + end, + file_size, + b'\n', + DEFAULT_BOUNDARY_WINDOW, + ) + .await + .unwrap(); + let new_bytes = store.requested_bytes(); + + assert!( + old_bytes >= new_bytes * 10, + "expected old path to request significantly more bytes, old={old_bytes}, new={new_bytes}" + ); + } +} diff --git a/datafusion/datasource-json/src/mod.rs b/datafusion/datasource-json/src/mod.rs index 3d27d4cc5ef5a..1117f04bb4b38 100644 --- a/datafusion/datasource-json/src/mod.rs +++ b/datafusion/datasource-json/src/mod.rs @@ -21,6 +21,7 @@ #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] #![deny(clippy::allow_attributes)] +pub mod boundary_utils; pub mod file_format; pub mod source; diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 5797054f11b9c..2e9de2485c404 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -18,10 +18,11 @@ //! Execution plan for reading line-delimited JSON files use std::any::Any; -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::io::{BufReader, Cursor, Read, Seek, SeekFrom}; use std::sync::Arc; use std::task::Poll; +use crate::boundary_utils::{DEFAULT_BOUNDARY_WINDOW, get_aligned_bytes}; use crate::file_format::JsonDecoder; use datafusion_common::error::{DataFusionError, Result}; @@ -30,9 +31,7 @@ use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::projection::{ProjectionOpener, SplitProjection}; -use datafusion_datasource::{ - ListingTableUrl, PartitionedFile, RangeCalculation, as_file_source, calculate_range, -}; +use datafusion_datasource::{ListingTableUrl, PartitionedFile, as_file_source}; use datafusion_physical_plan::projection::ProjectionExprs; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; @@ -188,23 +187,59 @@ impl FileOpener for JsonOpener { let file_compression_type = self.file_compression_type.to_owned(); Ok(Box::pin(async move { - let calculated_range = - calculate_range(&partitioned_file, &store, None).await?; + let file_size = partitioned_file.object_meta.size; + let location = &partitioned_file.object_meta.location; - let range = match calculated_range { - RangeCalculation::Range(None) => None, - RangeCalculation::Range(Some(range)) => Some(range.into()), - RangeCalculation::TerminateEarly => { + let file_range = if file_compression_type.is_compressed() { + None + } else { + partitioned_file.range.clone() + }; + + if let Some(file_range) = file_range.as_ref() { + let raw_start = u64::try_from(file_range.start).map_err(|_| { + DataFusionError::Internal( + "file range start must be non-negative".to_string(), + ) + })?; + let raw_end = u64::try_from(file_range.end).map_err(|_| { + DataFusionError::Internal( + "file range end must be non-negative".to_string(), + ) + })?; + let aligned_bytes = get_aligned_bytes( + &store, + location, + raw_start, + raw_end, + file_size, + b'\n', + DEFAULT_BOUNDARY_WINDOW, + ) + .await?; + + let Some(bytes) = aligned_bytes else { + return Ok( + futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed() + ); + }; + + if bytes.is_empty() { return Ok( futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed() ); } - }; - let options = GetOptions { - range, - ..Default::default() - }; + let reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(BufReader::new(Cursor::new(bytes)))?; + + return Ok(futures::stream::iter(reader) + .map(|r| r.map_err(Into::into)) + .boxed()); + } + + let options = GetOptions::default(); let result = store .get_opts(&partitioned_file.object_meta.location, options) @@ -218,7 +253,7 @@ impl FileOpener for JsonOpener { Some(_) => { file.seek(SeekFrom::Start(result.range.start as _))?; let limit = result.range.end - result.range.start; - file_compression_type.convert_read(file.take(limit as u64))? + file_compression_type.convert_read(file.take(limit))? } };