Skip to content

Commit

Permalink
Add cluster configuration to allow limiting the number of multi-stage…
Browse files Browse the repository at this point in the history
… queries running concurrently
  • Loading branch information
yashmayya committed Dec 16, 2024
1 parent e89ac94 commit 185fff5
Show file tree
Hide file tree
Showing 7 changed files with 645 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.broker.AccessControlFactory;
Expand All @@ -48,6 +50,7 @@
import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate;
import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageQuerySemaphore;
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
Expand Down Expand Up @@ -137,6 +140,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
// Handles the server routing stats.
protected ServerRoutingStatsManager _serverRoutingStatsManager;
protected HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
protected MultiStageQuerySemaphore _multiStageQuerySemaphore;

@Override
public void init(PinotConfiguration brokerConf)
Expand Down Expand Up @@ -335,13 +339,26 @@ public void start()
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
QueryDispatcher queryDispatcher = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
int numBrokers = Math.max(1, (int) _helixAdmin
.getInstancesInCluster(_spectatorHelixManager.getClusterName())
.stream()
.filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE))
.count());
int maxConcurrentQueries = Integer.parseInt(
_helixAdmin.getConfig(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_spectatorHelixManager.getClusterName()).build(),
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES));
_multiStageQuerySemaphore = new MultiStageQuerySemaphore(numBrokers, maxConcurrentQueries);
_multiStageQuerySemaphore.init(_spectatorHelixManager);
// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
queryDispatcher = createQueryDispatcher(_brokerConf);
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
_queryQuotaManager, tableCache);
_queryQuotaManager, tableCache, _multiStageQuerySemaphore);
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
Expand Down Expand Up @@ -380,6 +397,7 @@ public void start()
clusterConfigChangeHandler.init(_spectatorHelixManager);
}
_clusterConfigChangeHandlers.add(_queryQuotaManager);
_clusterConfigChangeHandlers.add(_multiStageQuerySemaphore);
for (ClusterChangeHandler idealStateChangeHandler : _idealStateChangeHandlers) {
idealStateChangeHandler.init(_spectatorHelixManager);
}
Expand All @@ -389,6 +407,7 @@ public void start()
}
_externalViewChangeHandlers.add(_routingManager);
_externalViewChangeHandlers.add(_queryQuotaManager);
_externalViewChangeHandlers.add(_multiStageQuerySemaphore);
for (ClusterChangeHandler instanceConfigChangeHandler : _instanceConfigChangeHandlers) {
instanceConfigChangeHandler.init(_spectatorHelixManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -52,6 +53,7 @@
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.ExceptionUtils;
import org.apache.pinot.common.utils.Timer;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.core.auth.Actions;
Expand Down Expand Up @@ -87,9 +89,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final WorkerManager _workerManager;
private final QueryDispatcher _queryDispatcher;
private final boolean _explainAskingServerDefault;
private final MultiStageQuerySemaphore _querySemaphore;

public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) {
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
MultiStageQuerySemaphore querySemaphore) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache);
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
Expand All @@ -105,6 +109,7 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
_explainAskingServerDefault = _config.getProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN,
CommonConstants.MultiStageQueryRunner.DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN);
_querySemaphore = querySemaphore;
}

@Override
Expand Down Expand Up @@ -136,14 +141,12 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders);
boolean inferPartitionHint = _config.getProperty(CommonConstants.Broker.CONFIG_OF_INFER_PARTITION_HINT,
CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT);
//@formatter:off
QueryEnvironment queryEnvironment = new QueryEnvironment(QueryEnvironment.configBuilder()
.database(database)
.tableCache(_tableCache)
.workerManager(_workerManager)
.defaultInferPartitionHint(inferPartitionHint)
.build());
//@formatter:on
switch (sqlNodeAndOptions.getSqlNode().getKind()) {
case EXPLAIN:
boolean askServers = QueryOptionsUtils.isExplainAskingServers(queryOptions)
Expand Down Expand Up @@ -224,67 +227,89 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage));
}

Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId), ThreadExecutionContext.TaskType.MSE);

long executionStartTimeNs = System.nanoTime();
QueryDispatcher.QueryResult queryResults;
Timer queryTimer = new Timer(queryTimeoutMs);
try {
queryResults =
_queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions);
} catch (TimeoutException e) {
for (String table : tableNames) {
_brokerMetrics.addMeteredTableValue(table, BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS, 1);
// It's fine to block in this thread because we use a separate thread pool from the main Jersey server to process
// these requests.
if (!_querySemaphore.tryAcquire(queryTimeoutMs, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Timed out waiting to execute request {}: {}", requestId, query);
requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
}
LOGGER.warn("Timed out executing request {}: {}", requestId, query);
} catch (InterruptedException e) {
LOGGER.warn("Interrupt received while waiting to execute request {}: {}", requestId, query);
requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
} catch (Throwable t) {
String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t);
LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage);
requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
return new BrokerResponseNative(
QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, consolidatedMessage));
} finally {
Tracing.getThreadAccountant().clear();
}
long executionEndTimeNs = System.nanoTime();
updatePhaseTimingForTables(tableNames, BrokerQueryPhase.QUERY_EXECUTION, executionEndTimeNs - executionStartTimeNs);

BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.setResultTable(queryResults.getResultTable());
brokerResponse.setTablesQueried(tableNames);
// TODO: Add servers queried/responded stats
brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs());

// Attach unavailable segments
int numUnavailableSegments = 0;
for (Map.Entry<String, Set<String>> entry : dispatchableSubPlan.getTableToUnavailableSegmentsMap().entrySet()) {
String tableName = entry.getKey();
Set<String> unavailableSegments = entry.getValue();
int unavailableSegmentsInSubPlan = unavailableSegments.size();
numUnavailableSegments += unavailableSegmentsInSubPlan;
brokerResponse.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR,
String.format("Found %d unavailable segments for table %s: %s", unavailableSegmentsInSubPlan, tableName,
toSizeLimitedString(unavailableSegments, NUM_UNAVAILABLE_SEGMENTS_TO_LOG))));
}
requestContext.setNumUnavailableSegments(numUnavailableSegments);

fillOldBrokerResponseStats(brokerResponse, queryResults.getQueryStats(), dispatchableSubPlan);
try {
Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId), ThreadExecutionContext.TaskType.MSE);

long executionStartTimeNs = System.nanoTime();
QueryDispatcher.QueryResult queryResults;
try {
queryResults =
_queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimer.getRemainingTime(),
queryOptions);
} catch (TimeoutException e) {
for (String table : tableNames) {
_brokerMetrics.addMeteredTableValue(table, BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS, 1);
}
LOGGER.warn("Timed out executing request {}: {}", requestId, query);
requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
} catch (Throwable t) {
String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t);
LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage);
requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
return new BrokerResponseNative(
QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, consolidatedMessage));
} finally {
Tracing.getThreadAccountant().clear();
}
long executionEndTimeNs = System.nanoTime();
updatePhaseTimingForTables(tableNames, BrokerQueryPhase.QUERY_EXECUTION,
executionEndTimeNs - executionStartTimeNs);

BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
brokerResponse.setResultTable(queryResults.getResultTable());
brokerResponse.setTablesQueried(tableNames);
// TODO: Add servers queried/responded stats
brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs());

// Attach unavailable segments
int numUnavailableSegments = 0;
for (Map.Entry<String, Set<String>> entry : dispatchableSubPlan.getTableToUnavailableSegmentsMap().entrySet()) {
String tableName = entry.getKey();
Set<String> unavailableSegments = entry.getValue();
int unavailableSegmentsInSubPlan = unavailableSegments.size();
numUnavailableSegments += unavailableSegmentsInSubPlan;
brokerResponse.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR,
String.format("Found %d unavailable segments for table %s: %s", unavailableSegmentsInSubPlan, tableName,
toSizeLimitedString(unavailableSegments, NUM_UNAVAILABLE_SEGMENTS_TO_LOG))));
}
requestContext.setNumUnavailableSegments(numUnavailableSegments);

// Set total query processing time
// TODO: Currently we don't emit metric for QUERY_TOTAL_TIME_MS
long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis();
brokerResponse.setTimeUsedMs(totalTimeMs);
augmentStatistics(requestContext, brokerResponse);
if (QueryOptionsUtils.shouldDropResults(queryOptions)) {
brokerResponse.setResultTable(null);
}
fillOldBrokerResponseStats(brokerResponse, queryResults.getQueryStats(), dispatchableSubPlan);

// Log query and stats
_queryLogger.log(
new QueryLogger.QueryLogParams(requestContext, tableNames.toString(), brokerResponse, requesterIdentity, null));
// Set total query processing time
// TODO: Currently we don't emit metric for QUERY_TOTAL_TIME_MS
long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis();
brokerResponse.setTimeUsedMs(totalTimeMs);
augmentStatistics(requestContext, brokerResponse);
if (QueryOptionsUtils.shouldDropResults(queryOptions)) {
brokerResponse.setResultTable(null);
}

return brokerResponse;
// Log query and stats
_queryLogger.log(
new QueryLogger.QueryLogParams(requestContext, tableNames.toString(), brokerResponse, requesterIdentity,
null));

return brokerResponse;
} finally {
_querySemaphore.release();
}
}

private Collection<PlanNode> requestPhysicalPlan(DispatchablePlanFragment fragment,
Expand Down
Loading

0 comments on commit 185fff5

Please sign in to comment.