Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster configuration to allow limiting the number of multi-stage queries running concurrently #14574

Merged
merged 3 commits into from
Dec 18, 2024

Conversation

yashmayya
Copy link
Collaborator

@yashmayya yashmayya commented Dec 2, 2024

  • Currently, there is no limit on the number of multi-stage queries that are executed concurrently. Every request that comes in to a broker is immediately compiled into a query plan and dispatched to the servers for query execution.
  • On the servers, each stage in the query plan is submitted to a cached thread pool executor. This means that we don't have an effective way to limit the number of threads that are being utilized to execute multi-stage queries on servers. This can result in a very large number of multi-stage query executor threads in high QPS / complex query workloads leading to resource contention and performance degradation.
  • Simply using a fixed thread pool executor instead is not a good solution because this could lead to distributed deadlocks and query timeouts / failures. Similarly, it isn't trivial to implement the max concurrent query execution logic in the servers without considering global ordering.
  • Consider this example from @gortiz -
    • Given a query Q1 is formed by stages S11 and S12, where S11 depends on data from S12.
    • And a query Q2 is formed by stages S21 and S22, where S21 depends on data from S22
    • If Q1 and Q2 arrive at the same time and:
      • Server Srv1 receives Q1 first, starts executing S11 and ends up waiting for server Srv2 to execute S12
      • Server Srv2 receives Q2 first, starts executing S21 and ends up waiting for server Srv1 to execute S22
      • Server Srv1 receives Q2 and would execute S22, but sees that it has reached max number of concurrent queries, so it waits.
      • Server Srv2 receives Q1 and would execute S12, but sees that it has reached max number of concurrent queries, so it waits.
      • This leads to a situation where both the queries are blocked and will eventually timeout and fail.
  • This patch takes an alternative approach of limiting the number of concurrently executing multi-stage queries in the broker itself.
  • A new cluster level config is introduced to limit the number of multi-stage queries running concurrently. The config is a "per server" value and can be potentially re-used in the future if / when we move to server side throttling. This is disabled by default and is currently an opt-in throttling mechanism.
  • If set to a value > 0, the value will be divided among all the brokers - i.e., if the config value is set to 6 and there are 2 brokers and 3 servers in the cluster, each broker will be able to execute 9 multi-stage queries concurrently. If there are already 9 queries currently executing on a broker, any subsequent incoming queries will wait for a slot to open up.
  • This is implemented using a custom semaphore which reacts to changes in the cluster config value or the number of brokers / servers in the cluster in order to recalculate the total available permits. A custom semaphore implementation is needed here because we want to use the Semaphore::reducePermits method (when # of brokers increases, # of servers decreases, or the cluster config value for max concurrent queries decreases) which has protected access. We need to use this method in order to be able to reduce the total number of permits for the semaphore without blocking.
  • Note that this first cut implementation is fairly primitive and assumes that queries are distributed evenly across the brokers and that each query will be executed on all the servers. Alternatively, we could check the actual servers that a query touches or even the number of workers to estimate the number of threads a query uses per server, but this would introduce additional complexity from both the user and developer POV.

@yashmayya yashmayya added enhancement multi-stage Related to the multi-stage query engine labels Dec 2, 2024
@codecov-commenter
Copy link

codecov-commenter commented Dec 2, 2024

Codecov Report

Attention: Patch coverage is 48.12030% with 69 lines in your changes missing coverage. Please review.

Project coverage is 64.02%. Comparing base (59551e4) to head (c6f18c3).
Report is 1479 commits behind head on master.

Files with missing lines Patch % Lines
...requesthandler/MultiStageBrokerRequestHandler.java 1.88% 52 Missing ⚠️
...main/java/org/apache/pinot/common/utils/Timer.java 0.00% 8 Missing ⚠️
.../pinot/common/concurrency/AdjustableSemaphore.java 61.53% 3 Missing and 2 partials ⚠️
...roker/requesthandler/MultiStageQueryThrottler.java 92.72% 0 Missing and 4 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14574      +/-   ##
============================================
+ Coverage     61.75%   64.02%   +2.27%     
- Complexity      207     1605    +1398     
============================================
  Files          2436     2706     +270     
  Lines        133233   149013   +15780     
  Branches      20636    22828    +2192     
============================================
+ Hits          82274    95411   +13137     
- Misses        44911    46607    +1696     
- Partials       6048     6995     +947     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.97% <48.12%> (+2.26%) ⬆️
java-21 63.91% <48.12%> (+2.28%) ⬆️
skip-bytebuffers-false 64.02% <48.12%> (+2.27%) ⬆️
skip-bytebuffers-true 63.85% <48.12%> (+36.13%) ⬆️
temurin 64.02% <48.12%> (+2.27%) ⬆️
unittests 64.02% <48.12%> (+2.27%) ⬆️
unittests1 56.32% <0.00%> (+9.43%) ⬆️
unittests2 34.47% <48.12%> (+6.73%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@yashmayya yashmayya marked this pull request as ready for review December 2, 2024 10:50
Copy link
Contributor

@gortiz gortiz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments, mainly on code style. Anyway we need to discuss whether we want to use these config options or some other

@yashmayya yashmayya force-pushed the multi-stage-max-concurrent-queries branch from 38ba313 to ea16de2 Compare December 16, 2024 14:06
Copy link
Collaborator Author

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, I've made some changes to make the cluster config a "per server" value and also renamed it to indicate that it's "beta" and subject to change or even removal in future releases.

Comment on lines 44 to 45
* the server. This would allow for more fine-grained control over the number of queries being executed concurrently
* (but there are some limitations around ordering and blocking that need to be solved first).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we are merging two different things here. In my mind there are 3 options:

  1. Broader broker check, the one we are implementing, which simplifies the check by assuming evenly distributed queries across brokers and servers.
  2. Ask servers if they can execute queries, where the broker ask the servers whether they can execute more queries or not (probably this should be done while workers/plan fragments are assigned). A server needs to be able to execute all the associated workers of the query atomically. This means that a query that spawns a lot of workers on a single server may consume a lot of threads in the server.
  3. Ask servers if they can execute workers. This is what implies there could be blocks if we don't care about ordering.

The difference between 2 and 3 is that queries that require tons of workers may end up consuming a lot of threads in 2, while in 3 we would execute only the some workers and once they finish the other will start to run. In order to do so we need to execute the dag in a consistent manner (and probably buffer some data).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I was assuming that for the 2nd option the check would happen before dispatch instead of during planning. Either way, this is just a rough sketch for a future idea so I've just removed this part of the Javadoc to avoid confusion. This PR can always be re-visited for more context in the future.

@gortiz
Copy link
Contributor

gortiz commented Dec 17, 2024

+1

@yashmayya yashmayya merged commit 6286109 into apache:master Dec 18, 2024
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement multi-stage Related to the multi-stage query engine
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants