Skip to content

18457: Support Arrow IPC Stream Files#2

Open
martin-augment wants to merge 13 commits intomainfrom
pr-18457-2025-11-04-11-28-07
Open

18457: Support Arrow IPC Stream Files#2
martin-augment wants to merge 13 commits intomainfrom
pr-18457-2025-11-04-11-28-07

Conversation

@martin-augment
Copy link
Owner

@martin-augment martin-augment commented Nov 4, 2025

18457: To review by AI


Note

Adds Arrow IPC stream format support via new ArrowStreamSource/Opener with auto-detection between file and stream, updates schema inference, and extends tests/SQLLogic to validate both.

  • Arrow IPC support:
    • Introduce ArrowFileSource and ArrowStreamSource with ArrowFileOpener/ArrowStreamOpener.
    • ArrowStreamSource reads sequentially (no range support); ArrowFileSource retains range-based parallel reads.
    • Auto-detect IPC file vs stream in ArrowFormat::create_physical_plan using header bytes; pick appropriate source.
    • Default writer/sink remains file format; file_source() now returns ArrowFileSource.
  • Schema inference:
    • Enhance to handle both IPC file (FileReader) and stream (StreamReader), including rewind on fallback and new infer_ipc_schema helper.
    • Improved error propagation from short/partial streams.
  • API/exports/docs:
    • Replace ArrowSource re-exports with ArrowFileSource/ArrowStreamSource and their openers.
    • Update FileSource docs and diagrams to reference new Arrow sources; file_type includes arrow_stream.
  • Tests:
    • Add unit tests for opener behavior (with/without ranges), schema inference for file/stream, and error cases.
    • Update schema adapter integration to use ArrowFileSource.
    • Extend sqllogictest to validate stream tables and parity with file format.

Written by Cursor Bugbot for commit ffeca09. This will update automatically on new commits. Configure here.

@coderabbitai
Copy link

coderabbitai bot commented Nov 4, 2025

Walkthrough

This pull request adds support for dual Arrow IPC formats by refactoring the Arrow datasource layer. It renames the existing ArrowSource to ArrowFileSource and ArrowOpener to ArrowFileOpener, while introducing new ArrowStreamSource and ArrowStreamOpener types for stream format handling. The implementation includes dynamic format detection, schema inference for both formats, and fallback logic.

Changes

Cohort / File(s) Summary
Public API exports
datafusion/core/src/datasource/physical_plan/mod.rs
Updated public re-exports from ArrowOpener, ArrowSource to ArrowFileOpener, ArrowFileSource, ArrowStreamOpener, ArrowStreamSource
Core Arrow source implementation
datafusion/datasource-arrow/src/source.rs
Renamed ArrowSource to ArrowFileSource and ArrowOpener to ArrowFileOpener; introduced new ArrowStreamSource and ArrowStreamOpener with full FileSource and FileOpener implementations; file types: "arrow" for file and "arrow_stream" for stream
Arrow file format logic
datafusion/datasource-arrow/src/file_format.rs
Added dual format schema inference with infer_ipc_schema and infer_ipc_schema_ignoring_preamble_bytes; implemented format detection by inspecting ARROW\_MAGIC and CONTINUATION\_MARKER; updated create_physical_plan to dynamically select ArrowFileSource or ArrowStreamSource; added fallback from FileReader to StreamReader
Test updates
datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs
Updated imports and usage from ArrowSource to ArrowFileSource; replaced ArrowSource::default() with ArrowFileSource::default()
Documentation updates
datafusion/datasource/src/file.rs, datafusion/datasource/src/source.rs
Updated type references in documentation from ArrowSource to ArrowFileSource; added entries for ArrowStreamSource in trait documentation
Integration tests
datafusion/sqllogictest/test_files/arrow_files.slt
Added new test block for Arrow IPC stream format; validates arrow_stream external table with schema inference, projection pushdown, filtering, and parity checks against file-based format
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-18457-2025-11-04-11-28-07

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 1 suggestions posted.

);

let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]];
let meta_len = i32::from_le_bytes(meta_len);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating that meta_len is non-negative before casting to usize to avoid huge allocations or reads on corrupt/invalid IPC input (e.g., negative values interpreted as large usize).

🤖 React with 👍 or 👎 to let us know if the comment was useful.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The AI reviewer is correct that the file content could be crafted in such a way that the meta_len is decoded to a negative value. This may lead to out of memory error trying to allocate such amount of memory.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
datafusion/datasource-arrow/src/file_format.rs (1)

436-470: Guard against negative IPC metadata sizes

Casting a negative meta_len to usize wraps to a huge value, so a malformed stream can drive Vec::with_capacity(meta_len as usize) into multi-gigabyte allocation and exhaust memory during schema inference. Reject negative lengths before converting to usize.

-    let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]];
-    let meta_len = i32::from_le_bytes(meta_len);
+    let meta_len_bytes = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]];
+    let raw_meta_len = i32::from_le_bytes(meta_len_bytes);
+    let meta_len = usize::try_from(raw_meta_len).map_err(|_| {
+        ArrowError::ParseError(format!(
+            "IPC metadata length must be non-negative: {raw_meta_len}"
+        ))
+    })?;
@@
-    if bytes[rest_of_bytes_start_index..].len() < meta_len as usize {
+    if bytes[rest_of_bytes_start_index..].len() < meta_len {
@@
-        let mut block_data = Vec::with_capacity(meta_len as usize);
+        let mut block_data = Vec::with_capacity(meta_len);
@@
-        let size_to_read = meta_len as usize - block_data.len();
+        let size_to_read = meta_len - block_data.len();
@@
-        let end_index = meta_len as usize + rest_of_bytes_start_index;
+        let end_index = meta_len + rest_of_bytes_start_index;
datafusion/datasource-arrow/src/source.rs (1)

94-99: Replace expect() with a proper error to avoid panics.

The statistics() method will panic if projected_statistics is not set. This could crash the application at runtime if the statistics are queried before being initialized.

Apply this diff to return a proper error instead:

 fn statistics(&self) -> Result<Statistics> {
-    let statistics = &self.projected_statistics;
-    Ok(statistics
-        .clone()
-        .expect("projected_statistics must be set"))
+    self.projected_statistics.clone().ok_or_else(|| {
+        exec_datafusion_err!("Statistics not available for ArrowFileSource")
+    })
 }
🧹 Nitpick comments (2)
datafusion/datasource-arrow/src/source.rs (2)

120-214: Consider extracting common implementation to reduce duplication.

ArrowStreamSource and ArrowFileSource have nearly identical implementations (only differing in file_type(), repartitioned(), and the opener type). This represents significant code duplication that could make maintenance harder.

Consider introducing a macro or a shared base struct to reduce duplication, or document why the duplication is intentional.


410-453: LGTM!

Good test coverage for both file and stream formats without ranges. The schema inference logic (lines 428-431) that tries FileReader first then falls back to StreamReader is practical.

Consider extracting the schema inference logic (lines 428-431) into a test helper function since it's used across multiple tests:

fn infer_schema(path: &str) -> Result<Arc<Schema>> {
    match FileReader::try_new(File::open(path)?, None) {
        Ok(reader) => Ok(reader.schema()),
        Err(_) => Ok(StreamReader::try_new(File::open(path)?, None)?.schema()),
    }
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between db5f47c and ffeca09.

📒 Files selected for processing (7)
  • datafusion/core/src/datasource/physical_plan/mod.rs (1 hunks)
  • datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs (2 hunks)
  • datafusion/datasource-arrow/src/file_format.rs (9 hunks)
  • datafusion/datasource-arrow/src/source.rs (6 hunks)
  • datafusion/datasource/src/file.rs (1 hunks)
  • datafusion/datasource/src/source.rs (2 hunks)
  • datafusion/sqllogictest/test_files/arrow_files.slt (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
datafusion/datasource-arrow/src/source.rs (5)
datafusion/datasource/src/file.rs (13)
  • as_file_source (40-42)
  • create_file_opener (57-62)
  • as_any (64-64)
  • with_batch_size (66-66)
  • with_schema (68-68)
  • with_statistics (72-72)
  • statistics (80-80)
  • with_projection (70-70)
  • repartitioned (94-117)
  • metrics (78-78)
  • file_type (82-82)
  • with_schema_adapter_factory (142-150)
  • schema_adapter_factory (155-157)
datafusion/datasource-json/src/source.rs (15)
  • from (94-96)
  • create_file_opener (100-114)
  • new (62-74)
  • new (88-90)
  • as_any (116-118)
  • with_batch_size (120-124)
  • with_schema (126-128)
  • with_statistics (129-133)
  • statistics (143-148)
  • with_projection (135-137)
  • metrics (139-141)
  • file_type (150-152)
  • with_schema_adapter_factory (154-162)
  • schema_adapter_factory (164-166)
  • open (179-244)
datafusion/datasource/src/file_scan_config.rs (11)
  • from (503-521)
  • from (2132-2170)
  • as_any (546-548)
  • with_batch_size (428-431)
  • with_statistics (368-371)
  • with_projection (336-338)
  • repartitioned (586-600)
  • metrics (676-678)
  • open (525-544)
  • projection (1834-1838)
  • file (2149-2165)
datafusion/datasource-avro/src/source.rs (15)
  • create_file_opener (66-76)
  • new (51-53)
  • as_any (78-80)
  • with_batch_size (82-86)
  • with_schema (88-93)
  • with_statistics (95-99)
  • statistics (111-116)
  • with_projection (101-105)
  • repartitioned (122-130)
  • metrics (107-109)
  • file_type (118-120)
  • with_schema_adapter_factory (132-140)
  • schema_adapter_factory (142-144)
  • open (55-62)
  • open (161-184)
datafusion/datasource/src/mod.rs (1)
  • new_with_range (142-158)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Cursor Bugbot
🔇 Additional comments (7)
datafusion/datasource-arrow/src/source.rs (7)

19-19: LGTM!

The new imports (Cursor, FileReader, StreamReader) are appropriately used throughout the implementation to support both Arrow IPC file and stream formats.

Also applies to: 26-26


169-184: LGTM!

The repartitioned() implementation correctly returns None for stream format since it doesn't support range-based parallel reading. The detailed comment explains the rationale well.


223-228: LGTM!

Correctly rejects range-based reading for stream format with a clear error message. This is the right approach since Arrow IPC Stream format lacks the footer needed for efficient range-based seeks.


236-251: LGTM! Note: File payload not supported on wasm.

The implementation correctly handles both File and Stream payloads using StreamReader. The #[cfg(not(target_arch = "wasm32"))] indicates File payload is unavailable on wasm, which appears intentional (only Stream payload would work on wasm).

If wasm support for File payload is expected in the future, please ensure this limitation is documented or tracked.


263-396: LGTM!

The ArrowFileOpener correctly uses FileReader for Arrow IPC File format and properly handles both non-range and range-based reading scenarios. The complex range-based logic appropriately reads the footer, dictionaries, and filters record batches based on the specified range.


455-495: LGTM!

Validates that ArrowFileOpener correctly handles range-based reading for the file format.


497-536: LGTM!

Good negative test confirming that ArrowStreamOpener properly rejects range-based reading attempts.

Comment on lines +190 to +195
fn statistics(&self) -> Result<Statistics> {
let statistics = &self.projected_statistics;
Ok(statistics
.clone()
.expect("projected_statistics must be set"))
}
Copy link

@coderabbitai coderabbitai bot Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Replace expect() with a proper error to avoid panics.

The statistics() method will panic if projected_statistics is not set, similar to the issue in ArrowFileSource.

Apply this diff:

 fn statistics(&self) -> Result<Statistics> {
-    let statistics = &self.projected_statistics;
-    Ok(statistics
-        .clone()
-        .expect("projected_statistics must be set"))
+    self.projected_statistics.clone().ok_or_else(|| {
+        exec_datafusion_err!("Statistics not available for ArrowStreamSource")
+    })
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn statistics(&self) -> Result<Statistics> {
let statistics = &self.projected_statistics;
Ok(statistics
.clone()
.expect("projected_statistics must be set"))
}
fn statistics(&self) -> Result<Statistics> {
self.projected_statistics.clone().ok_or_else(|| {
exec_datafusion_err!("Statistics not available for ArrowStreamSource")
})
}
🤖 Prompt for AI Agents
In datafusion/datasource-arrow/src/source.rs around lines 190 to 195, replace
the panic-causing expect() by checking projected_statistics and returning a
Result::Err when it's None; specifically, if projected_statistics is Some(s)
return Ok(s.clone()), otherwise return an appropriate DataFusion error (e.g.
DataFusionError::Internal or DataFusionError::Execution with a clear message
like "projected_statistics not set") so the function returns a proper error
instead of panicking.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The AI reviewer is correct that by using .expect() the software library may cause a panic and thus full stop of the user application. It is always better to return a Result::Err instead and let the user application decide how to handle it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-augment, thank you for confirming this analysis! You're absolutely right that library code should always return proper errors rather than panicking, giving user applications the control to handle failures gracefully. I appreciate the feedback and the clear categorization.

The suggested fix will replace the .expect() with a proper Result::Err using ok_or_else(), ensuring that callers receive a clear error message when statistics haven't been initialized rather than experiencing an unexpected panic.

🐰✨
(_/)
(•.•)
/>📚 noted and appreciated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments