Conversation
This patch adds the passing of the current session to the UDTF call. This helps implement session-dependent functions, for example, a function that returns the list of registered tables.
WalkthroughThe PR refactors the ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant enhancement to the User-Defined Table Function (UDTF) mechanism by providing UDTF implementations with access to the current session context. This change allows UDTFs to become more dynamic and context-aware, enabling them to interact with the DataFusion session state, such as listing available tables or accessing session-specific configurations. The update involves an API modification to the Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant improvement by allowing User-Defined Table Functions (UDTFs) to access session state. This is achieved by introducing a new call_with_args method on the TableFunctionImpl trait, which accepts TableFunctionArgs containing both the expressions and the session. The old call method is deprecated, and a smooth transition path is provided. The changes are consistently applied across the codebase, including core logic, FFI, examples, and documentation.
I've found one critical issue in the new example table_list_udtf.rs, where block_on is used in a way that will likely cause a panic at runtime. This needs to be addressed before merging.
| continue; | ||
| }; | ||
| for table_name in schema.table_names() { | ||
| let Some(provider) = block_on(schema.table(&table_name))? else { |
There was a problem hiding this comment.
Using futures::executor::block_on inside call_with_args is problematic. call_with_args is a synchronous function, but it's called during query planning, which is often initiated from an asynchronous context (e.g., SessionContext::sql). The futures::executor::block_on function will panic if it's called from within an existing async executor, which will be the case here, causing the query to fail at runtime.
This seems to expose a larger issue with the sync nature of call_with_args and the async nature of other APIs it needs to call, like schema.table(). While a full fix might require making call_with_args async (a larger change), this example should avoid this pattern. Perhaps there's a synchronous way to achieve this, or the example needs to be rethought to not require calling an async method from this synchronous context.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Gemini AI reviewer is correct! It is not a good idea to mix several async runtimes in one application. Since Tokio runtime is already in use it would be good to re-use it here too. Prevents runtime errors due to mixing more than one async runtimes.
PR Review: Provide session to the UDTF callSummaryThis PR adds session context to User-Defined Table Functions (UDTFs), enabling session-dependent functionality. The implementation is well-structured with proper backward compatibility handling. ✅ Strengths1. Excellent Backward Compatibility Design
2. Good API DesignThe new TableFunctionArgs struct (datafusion/catalog/src/table.rs:489-494) is clean and extensible, allowing future additions without breaking changes. 3. Comprehensive FFI SupportThe FFI layer (datafusion/ffi/src/udtf.rs:54-58) properly handles the new API with proper session serialization across FFI boundaries. 4. Good Example ImplementationThe table_list_udtf.rs example effectively demonstrates the feature with a real use case. 🔍 Issues & Concerns1. CRITICAL: Blocking Async Call in ExampleLocation: datafusion-examples/examples/udf/table_list_udtf.rs:99 Using block_on() in a potentially async context can cause deadlocks, poor performance, and runtime panics. Recommendation: Make call_with_args async, OR document that implementations must not use block_on, OR provide an async-safe alternative. 2. Session Downcasting FragilityLocation: datafusion-examples/examples/udf/table_list_udtf.rs:76-82 The downcast to SessionState is fragile across FFI boundaries. Consider adding helper methods to Session trait for common operations. 3. Missing Memory Efficiency in ExampleThe example clones catalog/schema/table names repeatedly which could be memory-intensive for large catalogs. 4. FFI Session Handling ComplexityLocation: datafusion/ffi/src/udtf.rs:123-136 The session handling pattern is complex and error-prone. Consider refactoring for clarity. 🧪 Test CoverageWhat's Covered:
What's Missing:
🔒 Security ConsiderationsNo significant security concerns, but note:
✅ Recommendations SummaryMust Fix (P0):
Should Fix (P1): Nice to Have (P2): 🎯 Final VerdictOverall Assessment: Good implementation with some concerns The PR successfully adds session access to UDTFs with:
Recommendation: Approve with changes - address the block_on() issue and improve test coverage before merging. Great work on maintaining backward compatibility! The deprecation strategy is exemplary. |
🤖 Augment PR SummarySummary: This PR extends DataFusion’s UDTF/table-function interface to provide access to the calling session, enabling session-aware table functions. Changes:
Technical Notes: Existing implementations can keep using 🤖 Was this summary useful? React with 👍 or 👎 |
| note = "Implement `TableFunctionImpl::call_with_args` instead" | ||
| )] | ||
| fn call(&self, _args: &[Expr]) -> Result<Arc<dyn TableProvider>> { | ||
| internal_err!("unimplemented") |
There was a problem hiding this comment.
The default deprecated TableFunctionImpl::call implementation returns internal_err!("unimplemented"), which can leak out to callers of the deprecated API as an “internal” engine error. Consider making this error more descriptive (e.g., pointing users to call_with_args) so it’s clearer what action is needed.
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:good-to-have; category:documentation; feedback: The Augment AI reviewer is correct! The call() method is deprecated by using the macro attribute but its error message could be improved to mention the new call_with_args() method too. This will spare the time of the developer to look in the rustdoc/sources to figure out the new recommendation.
| The simple UDTF used here takes a single `Int64` argument and returns a table with a single column with the value of the | ||
| argument. To create a function in DataFusion, you need to implement the `TableFunctionImpl` trait. This trait has a | ||
| single method, `call`, that takes a slice of `Expr`s and returns a `Result<Arc<dyn TableProvider>>`. | ||
| single method, `call_with_args`, that takes a `TableFunctionArgs` struct and returns a `Result<Arc<dyn TableProvider>>`. |
There was a problem hiding this comment.
This section says TableFunctionImpl has a “single method” and that TableFunctionArgs only includes the argument Exprs, but TableFunctionArgs also carries the calling session (and call still exists as deprecated). It may be worth updating the wording to reflect the actual API surface so readers don’t miss the session parameter.
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:good-to-have; category:documentation; feedback: The Augment AI reviewer is correct! The documentation is updated by replacing call() with call_with_args(), so it will be good to mention that the session state is also available now.
| )); | ||
|
|
||
| #[expect(deprecated)] | ||
| let table_provider = rresult_return!(udtf_inner.call(&args)); |
There was a problem hiding this comment.
FFI_TableFunction::call still dispatches to the deprecated TableFunctionImpl::call, so a UDTF that only implements call_with_args (as now recommended) will return "unimplemented" when invoked via the legacy FFI entrypoint. This seems like it could surprise FFI consumers unless they reliably switch to call_with_args when available.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Augment AI reviewer is correct! The FFI users should be notified somehow that the call() function is now deprecated and they should use the new call_with_args() method instead.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
datafusion-examples/examples/udf/main.rs (1)
70-81:⚠️ Potential issue | 🟡 MinorUpdate the module-level usage list to include
table_list_udtf.
The new example is wired in, but the doc list near Line 24 doesn't mention it, which makes the CLI usage docs stale.✏️ Suggested doc update
-//! cargo run --example udf -- [all|adv_udaf|adv_udf|adv_udwf|async_udf|udaf|udf|udtf|udwf] +//! cargo run --example udf -- [all|adv_udaf|adv_udf|adv_udwf|async_udf|udaf|udf|udtf|udwf|table_list_udtf] @@ //! - `udtf` //! (file: simple_udtf.rs, desc: Simple UDTF example) +//! +//! - `table_list_udtf` +//! (file: table_list_udtf.rs, desc: Session-aware UDTF table list example)
🤖 Fix all issues with AI agents
In `@datafusion-examples/examples/udf/table_list_udtf.rs`:
- Line 99: The sync UDTF method call_with_args currently uses
futures::executor::block_on when calling schema.table(&table_name), which will
panic inside a tokio runtime; replace that blocking call with a tokio-safe
approach: either run the async call inside tokio::task::block_in_place combined
with tokio::runtime::Handle::current().block_on(schema.table(&table_name)) or,
better, refactor to avoid async work in call_with_args by precomputing/caching
the table metadata outside the call_with_args implementation and using that
cache instead; update the call site in call_with_args (the
block_on(schema.table(&table_name)) invocation) to one of these tokio-safe
alternatives and ensure error handling remains consistent.
In `@docs/source/library-user-guide/functions/adding-udfs.md`:
- Around line 1391-1395: Update the documentation sentence describing the
`call_with_args` method to note that `TableFunctionArgs` carries both the
function arguments (a slice of `Expr`s) and the `session` field; mention
`session` as the key addition in this PR and, where relevant, suggest using
`session` for any validation or context needed when `call_with_args` parses
`Expr`s and returns a `Result<Arc<dyn TableProvider>>`.
🧹 Nitpick comments (2)
datafusion/ffi/src/udtf.rs (1)
48-58:call_with_argsfield is private whilecallis public — is this intentional?
callon line 50 ispub, butcall_with_argson line 54 is private. This works becauseForeignTableFunctionis in the same module, but the asymmetry is notable. If this is intentional (to discourage direct use of the raw FFI function pointer), consider adding a brief comment explaining the rationale.datafusion-examples/examples/udf/table_list_udtf.rs (1)
76-82: Downcast toSessionStatelimits portability across session implementations.The
downcast_ref::<SessionState>()will fail for non-standard session types (e.g.,ForeignSessionfrom the FFI path). This is acceptable for an example, but consider adding a doc comment noting this limitation so users adapting this pattern for production are aware.
| continue; | ||
| }; | ||
| for table_name in schema.table_names() { | ||
| let Some(provider) = block_on(schema.table(&table_name))? else { |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg -n 'block_on' --type rust -C3 -g '!target/**' | head -100Repository: martin-augment/datafusion
Length of output: 7540
🏁 Script executed:
# Check if the file actually uses block_on and understand the context
cat datafusion-examples/examples/udf/table_list_udtf.rs | head -130 | tail -50Repository: martin-augment/datafusion
Length of output: 1766
🏁 Script executed:
# Check if there are other UDTF examples or table functions that handle async differently
find datafusion-examples -name "*.rs" -type f | xargs grep -l "TableFunctionImpl" | head -5Repository: martin-augment/datafusion
Length of output: 168
🏁 Script executed:
# Check the actual usage in main.rs of the examples to understand execution context
find datafusion-examples -name "main.rs" -type f | xargs grep -l "table_list_udtf\|tokio" | head -3Repository: martin-augment/datafusion
Length of output: 224
🏁 Script executed:
# Check the full table_list_udtf.rs file to see how it's executed
cat datafusion-examples/examples/udf/table_list_udtf.rsRepository: martin-augment/datafusion
Length of output: 4380
🏁 Script executed:
# Check the simple_udtf.rs example for comparison
cat datafusion-examples/examples/udf/simple_udtf.rs | head -80Repository: martin-augment/datafusion
Length of output: 3055
🏁 Script executed:
# Look for any main function or how the example uses tokio
rg -n "tokio::|#\[tokio|#\[test\]|fn main" datafusion-examples/examples/udf/table_list_udtf.rsRepository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check if there's documentation about TableFunctionImpl trait
rg -n "trait TableFunctionImpl" --type rust -A 10 | head -30Repository: martin-augment/datafusion
Length of output: 818
🏁 Script executed:
# Look for main.rs files in examples/udf directory
find datafusion-examples/examples/udf -name "*.rs" -type fRepository: martin-augment/datafusion
Length of output: 547
🏁 Script executed:
# Check if there's an integration test or main file that shows how table_list_udtf is called
rg -n "table_list_udtf\|async fn main" datafusion-examples/examples/ -A 3 | head -50Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check Cargo.toml to understand how examples are meant to be run
cat datafusion-examples/Cargo.toml | head -60Repository: martin-augment/datafusion
Length of output: 2259
🏁 Script executed:
# Search for any documentation on how UDTF planning happens - is it sync or within async context?
rg -n "call_with_args|planning" datafusion/catalog/src/table.rs -B 2 -A 5 | head -40Repository: martin-augment/datafusion
Length of output: 1156
🏁 Script executed:
# Check the main.rs file in examples/udf
cat datafusion-examples/examples/udf/main.rsRepository: martin-augment/datafusion
Length of output: 4102
🏁 Script executed:
# Check if call_with_args is invoked during planning or execution - look at how table functions are invoked
rg -n "call_with_args|TableFunctionImpl" datafusion/core/src/ -B 2 -A 3 | head -60Repository: martin-augment/datafusion
Length of output: 3928
🏁 Script executed:
# Check if the invocation happens within a tokio context or async context
rg -n "fn analyze_table_function|impl.*Analyzer" datafusion/core/src/ -B 2 -A 8 | head -50Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Find where call_with_args is invoked in the codebase
rg -n "call_with_args" datafusion/core/src/ -B 3 -A 3 | head -80Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check if table functions are resolved during planning or execution
rg -n "TableFunctionImpl|table_function" datafusion-optimizer/src/ -B 2 -A 2 | head -60Repository: martin-augment/datafusion
Length of output: 134
🏁 Script executed:
# Look for the logical plan building for table functions
rg -n "TableFunction|table_list" datafusion-sql/src/ -B 1 -A 3 | head -50Repository: martin-augment/datafusion
Length of output: 128
🏁 Script executed:
# Find where call_with_args is invoked
rg -n "call_with_args" --type rustRepository: martin-augment/datafusion
Length of output: 2945
🏁 Script executed:
# Check which directories exist in datafusion
ls -la datafusion/ | head -20Repository: martin-augment/datafusion
Length of output: 1205
🏁 Script executed:
# Look for table function resolution during planning
rg -n "TableFunction|\.table\(" datafusion/ -B 2 -A 3 --type rust | head -100Repository: martin-augment/datafusion
Length of output: 7789
🏁 Script executed:
# Find get_table_function_source implementation
rg -n "get_table_function_source" --type rust -B 2 -A 10 | head -80Repository: martin-augment/datafusion
Length of output: 4007
🏁 Script executed:
# Check if table functions are resolved sync or async
rg -n "fn get_table_function_source|impl.*ContextProvider" datafusion/core/src/ -A 8 | head -80Repository: martin-augment/datafusion
Length of output: 5734
🏁 Script executed:
# Look at how SessionState provides table functions
rg -n "table_functions|get_table_function" datafusion/core/src/execution/session_state.rs -B 2 -A 5 | head -80Repository: martin-augment/datafusion
Length of output: 3651
🏁 Script executed:
# Get the full implementation of get_table_function_source
cat datafusion/core/src/execution/session_state.rs | sed -n '1827,1860p'Repository: martin-augment/datafusion
Length of output: 1327
🏁 Script executed:
# Check if schema.table() is async
rg -n "async.*fn table\(" datafusion/catalog/src/ --type rust -B 2 -A 5 | head -50Repository: martin-augment/datafusion
Length of output: 3533
🏁 Script executed:
# Check how ctx.sql() works - does it synchronously plan while in async context?
rg -n "async fn sql\|pub async fn sql" datafusion/core/src/ -A 10 | head -60Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Look for tests of table functions to see if they handle block_on safely
rg -n "block_on|TableFunction" datafusion/core/tests/user_defined/user_defined_table_functions.rs -B 2 -A 5 | head -80Repository: martin-augment/datafusion
Length of output: 1177
🏁 Script executed:
# Check if the example actually runs and whether it panics
cat datafusion-examples/examples/udf/table_list_udtf.rs | grep -A 5 "pub async fn table_list_udtf"Repository: martin-augment/datafusion
Length of output: 300
🏁 Script executed:
# Check if there are any tests that actually run table_list_udtf
rg -n "table_list" datafusion/core/tests/ --type rust -B 2 -A 2Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check if block_on can somehow be safe in this context by looking at tokio version
cat Cargo.lock | grep -A 3 "name = \"tokio\"" | head -10Repository: martin-augment/datafusion
Length of output: 245
🏁 Script executed:
# Verify the actual behavior: does planning happen within tokio context?
cat datafusion/core/src/execution/context/mod.rs | sed -n '1,100p' | grep -A 20 "async fn sql"Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check if table_list_udtf is tested anywhere
rg -n "table_list_udtf\|TableListUdtf" --type rustRepository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Look for integration tests that would run the example
find . -name "*.rs" -path "*/tests/*" | xargs grep -l "examples\|table_list" 2>/dev/null | head -5Repository: martin-augment/datafusion
Length of output: 241
🏁 Script executed:
# Check documentation or comments about block_on safety
rg -n "futures::executor::block_on" --type rust -B 5 -A 2 | head -100Repository: martin-augment/datafusion
Length of output: 3489
Use a tokio-safe alternative to block on async work within a sync context.
futures::executor::block_on() panics if called from within an active tokio runtime. Since this UDTF's call_with_args is invoked during SQL planning (which runs within the tokio context from #[tokio::main]), consider using tokio::task::block_in_place() combined with Handle::current().block_on(), or refactor to pre-compute table metadata before planning.
Suggested fix
-use futures::executor::block_on;
+use tokio::task;At the call site:
- let Some(provider) = block_on(schema.table(&table_name))? else {
+ let Some(provider) = task::block_in_place(|| {
+ tokio::runtime::Handle::current().block_on(schema.table(&table_name))
+ })? else {Alternatively, if you know a tokio runtime is always available, you could cache the table list outside the table function to avoid calling async code in the sync trait method.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let Some(provider) = block_on(schema.table(&table_name))? else { | |
| let Some(provider) = task::block_in_place(|| { | |
| tokio::runtime::Handle::current().block_on(schema.table(&table_name)) | |
| })? else { |
🤖 Prompt for AI Agents
In `@datafusion-examples/examples/udf/table_list_udtf.rs` at line 99, The sync
UDTF method call_with_args currently uses futures::executor::block_on when
calling schema.table(&table_name), which will panic inside a tokio runtime;
replace that blocking call with a tokio-safe approach: either run the async call
inside tokio::task::block_in_place combined with
tokio::runtime::Handle::current().block_on(schema.table(&table_name)) or,
better, refactor to avoid async work in call_with_args by precomputing/caching
the table metadata outside the call_with_args implementation and using that
cache instead; update the call site in call_with_args (the
block_on(schema.table(&table_name)) invocation) to one of these tokio-safe
alternatives and ensure error handling remains consistent.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! It is not a good idea to mix several async runtimes in one application. Since Tokio runtime is already in use it would be good to re-use it here too. Prevents runtime errors due to mixing more than one async runtimes.
| single method, `call_with_args`, that takes a `TableFunctionArgs` struct and returns a `Result<Arc<dyn TableProvider>>`. | ||
| Passed struct includes function arguments as a slice of `Expr`s. | ||
|
|
||
| In the `call` method, you parse the input `Expr`s and return a `TableProvider`. You might also want to do some | ||
| In the `call_with_args` method, you parse the input `Expr`s and return a `TableProvider`. You might also want to do some | ||
| validation of the input `Expr`s, e.g. checking that the number of arguments is correct. |
There was a problem hiding this comment.
Mention that TableFunctionArgs also carries the session.
The new sentence only references args, but the struct also exposes session, which is the key addition of this PR.
✏️ Suggested wording update
-single method, `call_with_args`, that takes a `TableFunctionArgs` struct and returns a `Result<Arc<dyn TableProvider>>`.
-Passed struct includes function arguments as a slice of `Expr`s.
+single method, `call_with_args`, that takes a `TableFunctionArgs` struct and returns a `Result<Arc<dyn TableProvider>>`.
+The struct includes the function arguments (`args`) and the current session (`session`).📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| single method, `call_with_args`, that takes a `TableFunctionArgs` struct and returns a `Result<Arc<dyn TableProvider>>`. | |
| Passed struct includes function arguments as a slice of `Expr`s. | |
| In the `call` method, you parse the input `Expr`s and return a `TableProvider`. You might also want to do some | |
| In the `call_with_args` method, you parse the input `Expr`s and return a `TableProvider`. You might also want to do some | |
| validation of the input `Expr`s, e.g. checking that the number of arguments is correct. | |
| single method, `call_with_args`, that takes a `TableFunctionArgs` struct and returns a `Result<Arc<dyn TableProvider>>`. | |
| The struct includes the function arguments (`args`) and the current session (`session`). | |
| In the `call_with_args` method, you parse the input `Expr`s and return a `TableProvider`. You might also want to do some | |
| validation of the input `Expr`s, e.g. checking that the number of arguments is correct. |
🤖 Prompt for AI Agents
In `@docs/source/library-user-guide/functions/adding-udfs.md` around lines 1391 -
1395, Update the documentation sentence describing the `call_with_args` method
to note that `TableFunctionArgs` carries both the function arguments (a slice of
`Expr`s) and the `session` field; mention `session` as the key addition in this
PR and, where relevant, suggest using `session` for any validation or context
needed when `call_with_args` parses `Expr`s and returns a `Result<Arc<dyn
TableProvider>>`.
There was a problem hiding this comment.
value:good-to-have; category:documentation; feedback: The CodeRabbit AI reviewer is correct! The documentation is updated by replacing call() with call_with_args(), so it will be good to mention that the session state is also available now.
value:useful; category:bug; feedback: The Claude AI reviewer is correct! It is not a good idea to mix several async runtimes in one application. Since Tokio runtime is already in use it would be good to re-use it here too. Prevents runtime errors due to mixing more than one async runtimes. |
value:annoying; category:bug; feedback: The Claude AI reviewer is not correct! This is just an example that shows how to use the provided session state in a UDTF. It is not supposed to be used in FFI environment or to be optimal. |
value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! The "usage" of the UDTF examples should be updated to mention the new table_list_udtf example too. |
20222: To review by AI