-
-
Notifications
You must be signed in to change notification settings - Fork 135
perf: don't construct a tokio runtime for each query #1226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis pull request refactors the query execution flow across multiple modules. The changes remove the use of blocking tasks in favor of direct asynchronous calls. In the airplane and HTTP handlers, the query execution now obtains a stream directly from a central parser module and executes queries asynchronously with improved error handling. Additionally, the public interface is expanded with a new Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant AirService
participant Parseable
participant Query
Client->>AirService: GET query request
AirService->>Parseable: get_stream(stream_name)
Parseable-->>AirService: stream / error
AirService->>Query: query.execute(&stream)
Query->>stream: get_time_partition()
Query-->>AirService: results / error
AirService-->>Client: response
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/alerts/alerts_utils.rs (1)
34-34
: Avoid potential confusion with global usage
By referencingPARSEABLE
directly, you rely on a global context, which may make testing or concurrency more complex. Consider documenting concurrency safety or injecting dependencies if feasible.src/query/mod.rs (1)
65-77
: Dedicated async runtime for queries
Providing a specializedQUERY_RUNTIME
can improve performance and isolation for query processing. Ensure it does not conflict with any existing runtime usage within your application.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
src/alerts/alerts_utils.rs
(2 hunks)src/alerts/mod.rs
(4 hunks)src/handlers/airplane.rs
(2 hunks)src/handlers/http/query.rs
(3 hunks)src/parseable/mod.rs
(1 hunks)src/query/mod.rs
(7 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- src/handlers/airplane.rs
- src/parseable/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
🔇 Additional comments (13)
src/alerts/alerts_utils.rs (1)
141-143
: Time partition usage is consistent
Retrievingtime_partition
from the stream ensures precise data filtering. This approach is consistent with the rest of the code for retrieving time-based partitions.src/handlers/http/query.rs (3)
42-44
: New imports for streamlined error handling and query execution
ImportingStreamNotFound
,PARSEABLE
, andexecute
is a clear approach, aligning with the new asynchronous query strategy.
133-134
: Switch to the newexecute
function
Replacing the old blocking query approach withexecute(query, &table_name).await?
improves performance by leveraging the dedicated async runtime.
322-323
: Extended error types
The addition ofStreamNotFound
toQueryError
improves clarity when indicating that a stream was not located.src/alerts/mod.rs (3)
40-40
: Introduced global reference for parseable
ImportingPARSEABLE
andStreamNotFound
ensures consistent usage of the global context and specialized error in the alerts system.
706-707
: NewStreamNotFound
variant
Defining a dedicated error variant clarifies potential missing stream issues within the alerts module.
721-721
: Specific HTTP status for missing streams
MappingStreamNotFound
toNOT_FOUND
status code provides precise error responses for clients.src/query/mod.rs (6)
45-45
: AddRuntime
import
Importingtokio::runtime::Runtime
is crucial for creating the dedicated query execution environment.
150-150
: Revised signature forexecute
Allowing an optionaltime_partition
reference clarifies partition usage, enabling fallback behavior if no partition is provided.
173-176
:get_dataframe
with optional time partition
This adaptation ensures consistent interface design across the module, especially for partition-based queries.
185-185
:final_logical_plan
now supports optional partition
Extending the function signature to acceptOption<&String>
aligns with the rest of the code handling partitions gracefully.
505-506
:transform
function parameter
By passingOption<&String>
for the partition, the code can branch between a custom partition key and the default timestamp key.
563-563
:table_contains_any_time_filters
includes optional partition
Recognizing both user-specified partition and default timestamp is essential for correct filter application.
Signed-off-by: Devdutt Shenoi <devdutt@outlook.in>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/query/mod.rs (1)
69-80
: Consider handling the JoinError instead of using expect.The implementation correctly spawns query execution on the dedicated runtime. However, the
.expect("The Join should have been successful")
on line 79 could lead to a panic if the spawned task fails. Consider handling this error case more gracefully.pub async fn execute( query: Query, stream_name: &str, ) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> { let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); QUERY_RUNTIME .spawn(async move { query.execute(time_partition.as_ref()).await }) .await - .expect("The Join should have been successful") + .map_err(|e| ExecuteError::Datafusion(DataFusionError::Execution(format!("Join error: {}", e))))? }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/query/mod.rs
(7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (6)
src/query/mod.rs (6)
64-66
: Good initialization of a dedicated runtime for queries.The static runtime provides a single, shared Tokio runtime for all queries, which aligns with the PR's objective of avoiding runtime creation for each query. The use of
Lazy::new()
ensures the runtime is only initialized once when needed.
151-154
: Signature change aligns with the centralized runtime approach.The method now accepts a reference to a time partition instead of a stream name string, which fits well with the new architecture where time partition resolution happens in the outer
execute
function. This change helps avoid redundant work.
176-179
: Consistent parameter changes to support the new approach.The
get_dataframe
method has been updated to match the pattern used inexecute
, accepting a time partition reference instead of a stream name. This ensures consistency across the API and supports the centralized time partition resolution.
188-188
: Parameter type simplification is appropriate.Changed from taking a reference to an Option to directly taking an Option of a reference, which is a more ergonomic approach and aligns with the parameter changes in other methods.
504-509
: Parameter type consistency maintained throughout codebase.The
transform
function's signature has been updated to match the changes in related functions, ensuring consistency across the module. This type of parameter is more efficient as it avoids unnecessary cloning.
564-567
: Simplified time_partition check withis_some_and
.The code now uses
is_some_and
for a more concise check against the time partition field. This improves readability while maintaining functionality.Also applies to: 580-581
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
New Features
StreamNotFound
, for enhanced error reporting related to stream retrieval issues.Refactor