-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[timeseries] Part-4: Complete Support for Multi-Server Queries #14676
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14676 +/- ##
============================================
+ Coverage 61.75% 63.97% +2.22%
- Complexity 207 1608 +1401
============================================
Files 2436 2707 +271
Lines 133233 149242 +16009
Branches 20636 22871 +2235
============================================
+ Hits 82274 95484 +13210
- Misses 44911 46763 +1852
- Partials 6048 6995 +947
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
StandardCharsets.UTF_8)) | ||
.build(); | ||
responseObserver.onNext(response); | ||
for (int index = 0; index < fragmentOpChains.size(); index++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this be blocking till all the fragments are executed? What happens when query times out?
if (planNode instanceof LeafTimeSeriesPlanNode) { | ||
throw new IllegalStateException("Found leaf time series plan node in broker"); | ||
} else if (planNode instanceof TimeSeriesExchangeNode) { | ||
int numInputServers = numInputServersByExchangeNode.get(planNode.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will the input servers be computed?
} | ||
} | ||
|
||
TimeSeriesBlock submitAndGet(long requestId, TimeSeriesDispatchablePlan plan, long timeoutMs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you share short blurb about this method?
* buffer the data sent by the sender. This is set large enough that we should never hit this for any practical | ||
* use-case, while guarding us against bugs. | ||
*/ | ||
public static final int MAX_QUEUE_CAPACITY = 4096; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a per server Queue ?
@@ -30,37 +35,57 @@ | |||
* engine integration. | |||
*/ | |||
public class TimeSeriesDispatchObserver implements StreamObserver<Worker.TimeSeriesResponse> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now as Broker is doing reduce work ? Can broker become bottleneck? Did you perform any analysis for broker in your cluster testing?
List<BaseTimeSeriesPlanNode> planNodes, Map<String, Map<String, List<String>>> leafIdToSegmentsByInstanceId) { | ||
// TODO(timeseries): Handle this gracefully and return an empty block. | ||
Preconditions.checkState(!serverInstances.isEmpty(), "No servers selected for the query"); | ||
if (serverInstances.size() == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to differentiate between single server vs multi server? Shouldn't it be transparent?
Finishes the ongoing work to support Time Series queries on tables that can have
numInstancesPerReplicaGroup > 1
.The Timeseries Quickstart also now starts with 2 servers.
Also tested in one of our smaller clusters and we are able to get 50-100 QPS consistently.