Skip to content

Commit

Permalink
Exclude secondary workload queries from some critical metrics (#14553)
Browse files Browse the repository at this point in the history
  • Loading branch information
vvivekiyer authored Dec 2, 2024
1 parent 2910caa commit 0ccc647
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -842,8 +842,16 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
if (QueryOptionsUtils.shouldDropResults(pinotQuery.getQueryOptions())) {
brokerResponse.setResultTable(null);
}
_brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.QUERY_TOTAL_TIME_MS, totalTimeMs,
TimeUnit.MILLISECONDS);
if (QueryOptionsUtils.isSecondaryWorkload(pinotQuery.getQueryOptions())) {
_brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.SECONDARY_WORKLOAD_QUERY_TOTAL_TIME_MS, totalTimeMs,
TimeUnit.MILLISECONDS);
_brokerMetrics.addTimedValue(BrokerTimer.SECONDARY_WORKLOAD_QUERY_TOTAL_TIME_MS, totalTimeMs,
TimeUnit.MILLISECONDS);
} else {
_brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.QUERY_TOTAL_TIME_MS, totalTimeMs,
TimeUnit.MILLISECONDS);
_brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS, totalTimeMs, TimeUnit.MILLISECONDS);
}

// Log query and stats
_queryLogger.log(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.transport.AsyncQueryResponse;
import org.apache.pinot.core.transport.QueryResponse;
Expand Down Expand Up @@ -116,7 +117,9 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
_failureDetector.notifyQuerySubmitted(asyncQueryResponse);
Map<ServerRoutingInstance, ServerResponse> finalResponses = asyncQueryResponse.getFinalResponses();
if (asyncQueryResponse.getStatus() == QueryResponse.Status.TIMED_OUT) {
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS, 1);
BrokerMeter meter = QueryOptionsUtils.isSecondaryWorkload(serverBrokerRequest.getPinotQuery().getQueryOptions())
? BrokerMeter.SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_TIMEOUTS : BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS;
_brokerMetrics.addMeteredTableValue(rawTableName, meter, 1);
}
_failureDetector.notifyQueryFinished(asyncQueryResponse);
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.SCATTER_GATHER,
Expand Down Expand Up @@ -162,7 +165,11 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
if (numServersNotResponded != 0) {
brokerResponse.addException(new QueryProcessingException(QueryException.SERVER_NOT_RESPONDING_ERROR_CODE,
String.format("%d servers %s not responded", numServersNotResponded, serversNotResponded)));
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED, 1);

BrokerMeter meter = QueryOptionsUtils.isSecondaryWorkload(serverBrokerRequest.getPinotQuery().getQueryOptions())
? BrokerMeter.SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED
: BrokerMeter.BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED;
_brokerMetrics.addMeteredTableValue(rawTableName, meter, 1);
}
if (brokerResponse.getExceptionsSize() > 0) {
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_PROCESSING_EXCEPTIONS, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
// (numServersQueried > numServersResponded)
BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED("badResponses", false),

SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED("badResponses", false),

BROKER_RESPONSES_WITH_TIMEOUTS("badResponses", false),

SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_TIMEOUTS("badResponses", false),

// This metric track the number of broker responses with number of groups limit reached (potential bad responses).
BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED("badResponses", false),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public enum BrokerTimer implements AbstractMetrics.Timer {
ROUTING_TABLE_UPDATE_TIME(true),
CLUSTER_CHANGE_QUEUE_TIME(true), // metric tracking the freshness lag for consuming segments
FRESHNESS_LAG_MS(false),
QUERY_TOTAL_TIME_MS(false),
QUERY_TOTAL_TIME_MS(true),

SECONDARY_WORKLOAD_QUERY_TOTAL_TIME_MS(true),

// The latency of sending the request from broker to server
NETTY_CONNECTION_SEND_REQUEST_LATENCY(false),
Expand Down

0 comments on commit 0ccc647

Please sign in to comment.