Skip to content

Commit

Permalink
Change config to per server instead of cluster level; address review …
Browse files Browse the repository at this point in the history
…comments
  • Loading branch information
yashmayya committed Dec 16, 2024
1 parent 185fff5 commit ea16de2
Show file tree
Hide file tree
Showing 7 changed files with 462 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.broker.AccessControlFactory;
Expand All @@ -50,7 +48,7 @@
import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate;
import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageQuerySemaphore;
import org.apache.pinot.broker.requesthandler.MultiStageQueryThrottler;
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.TimeSeriesRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
Expand Down Expand Up @@ -140,7 +138,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
// Handles the server routing stats.
protected ServerRoutingStatsManager _serverRoutingStatsManager;
protected HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
protected MultiStageQuerySemaphore _multiStageQuerySemaphore;
protected MultiStageQueryThrottler _multiStageQueryThrottler;

@Override
public void init(PinotConfiguration brokerConf)
Expand Down Expand Up @@ -339,26 +337,15 @@ public void start()
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
QueryDispatcher queryDispatcher = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
int numBrokers = Math.max(1, (int) _helixAdmin
.getInstancesInCluster(_spectatorHelixManager.getClusterName())
.stream()
.filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE))
.count());
int maxConcurrentQueries = Integer.parseInt(
_helixAdmin.getConfig(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_spectatorHelixManager.getClusterName()).build(),
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES));
_multiStageQuerySemaphore = new MultiStageQuerySemaphore(numBrokers, maxConcurrentQueries);
_multiStageQuerySemaphore.init(_spectatorHelixManager);
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_spectatorHelixManager);
// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
queryDispatcher = createQueryDispatcher(_brokerConf);
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
_queryQuotaManager, tableCache, _multiStageQuerySemaphore);
_queryQuotaManager, tableCache, _multiStageQueryThrottler);
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
Expand Down Expand Up @@ -397,7 +384,7 @@ public void start()
clusterConfigChangeHandler.init(_spectatorHelixManager);
}
_clusterConfigChangeHandlers.add(_queryQuotaManager);
_clusterConfigChangeHandlers.add(_multiStageQuerySemaphore);
_clusterConfigChangeHandlers.add(_multiStageQueryThrottler);
for (ClusterChangeHandler idealStateChangeHandler : _idealStateChangeHandlers) {
idealStateChangeHandler.init(_spectatorHelixManager);
}
Expand All @@ -407,7 +394,7 @@ public void start()
}
_externalViewChangeHandlers.add(_routingManager);
_externalViewChangeHandlers.add(_queryQuotaManager);
_externalViewChangeHandlers.add(_multiStageQuerySemaphore);
_externalViewChangeHandlers.add(_multiStageQueryThrottler);
for (ClusterChangeHandler instanceConfigChangeHandler : _instanceConfigChangeHandlers) {
instanceConfigChangeHandler.init(_spectatorHelixManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final WorkerManager _workerManager;
private final QueryDispatcher _queryDispatcher;
private final boolean _explainAskingServerDefault;
private final MultiStageQuerySemaphore _querySemaphore;
private final MultiStageQueryThrottler _queryThrottler;

public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
MultiStageQuerySemaphore querySemaphore) {
MultiStageQueryThrottler queryThrottler) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache);
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
Expand All @@ -109,7 +109,7 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
_explainAskingServerDefault = _config.getProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN,
CommonConstants.MultiStageQueryRunner.DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN);
_querySemaphore = querySemaphore;
_queryThrottler = queryThrottler;
}

@Override
Expand Down Expand Up @@ -231,7 +231,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
try {
// It's fine to block in this thread because we use a separate thread pool from the main Jersey server to process
// these requests.
if (!_querySemaphore.tryAcquire(queryTimeoutMs, TimeUnit.MILLISECONDS)) {
if (!_queryThrottler.tryAcquire(queryTimeoutMs, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Timed out waiting to execute request {}: {}", requestId, query);
requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
Expand Down Expand Up @@ -308,7 +308,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO

return brokerResponse;
} finally {
_querySemaphore.release();
_queryThrottler.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,125 +18,120 @@
*/
package org.apache.pinot.broker.requesthandler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.concurrent.Semaphore;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.broker.broker.helix.ClusterChangeHandler;
import org.apache.pinot.common.concurrency.AdjustableSemaphore;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* This class helps limit the number of multi-stage queries being executed concurrently. Currently, this limit is
* applied at the broker level, but could be moved to the server level in the future.
* applied at the broker level, but could be moved to the server level in the future. Note that the cluster
* configuration is a "per server" value and the broker currently simply assumes that a query will be across all
* servers. Another assumption here is that queries are evenly distributed across brokers. Ideally, we want to move to a
* model where the broker asks each server whether it can execute a query stage before dispatching the query stage to
* the server. This would allow for more fine-grained control over the number of queries being executed concurrently
* (but there are some limitations around ordering and blocking that need to be solved first).
*/
public class MultiStageQuerySemaphore extends Semaphore implements ClusterChangeHandler {
public class MultiStageQueryThrottler implements ClusterChangeHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageQuerySemaphore.class);
private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageQueryThrottler.class);

private HelixManager _helixManager;
private HelixAdmin _helixAdmin;
private HelixConfigScope _helixConfigScope;
private int _numBrokers;
private int _numServers;
// If _maxConcurrentQueries is <= 0, it means that the cluster is not configured to limit the number of multi-stage
// queries that can be executed concurrently. In this case, we should not block the query.
private int _maxConcurrentQueries;
private int _totalPermits;

public MultiStageQuerySemaphore(int numBrokers, int maxConcurrentQueries) {
super(Math.max(1, maxConcurrentQueries / Math.max(numBrokers, 1)), true);
_maxConcurrentQueries = maxConcurrentQueries;
_numBrokers = Math.max(1, numBrokers);
_totalPermits =
maxConcurrentQueries > 0 ? Math.max(1, maxConcurrentQueries / Math.max(numBrokers, 1)) : maxConcurrentQueries;
}

@Override
public void acquire()
throws InterruptedException {
// If _totalPermits is <= 0, it means that the cluster is not configured to limit the number of multi-stage queries
// that can be executed concurrently. In this case, we should not block the query.
if (_totalPermits > 0) {
super.acquire();
}
}
private AdjustableSemaphore _semaphore;

@Override
public void acquireUninterruptibly() {
// If _totalPermits is <= 0, it means that the cluster is not configured to limit the number of multi-stage queries
// that can be executed concurrently. In this case, we should not block the query.
if (_totalPermits > 0) {
super.acquireUninterruptibly();
}
}
public void init(HelixManager helixManager) {
_helixManager = helixManager;
_helixAdmin = _helixManager.getClusterManagmentTool();
_helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_helixManager.getClusterName()).build();

@Override
public boolean tryAcquire() {
// If _totalPermits is <= 0, it means that the cluster is not configured to limit the number of multi-stage queries
// that can be executed concurrently. In this case, we should not block the query.
if (_totalPermits > 0) {
return super.tryAcquire();
} else {
return true;
_maxConcurrentQueries = Integer.parseInt(
_helixAdmin.getConfig(_helixConfigScope,
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
CommonConstants.Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES));

List<String> clusterInstances = _helixAdmin.getInstancesInCluster(_helixManager.getClusterName());
_numBrokers = Math.max(1, (int) clusterInstances.stream()
.filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE))
.count());
_numServers = Math.max(1, (int) clusterInstances.stream()
.filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))
.count());

if (_maxConcurrentQueries > 0) {
_semaphore = new AdjustableSemaphore(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers), true);
}
}

@Override
/**
* Returns true if the query can be executed (waiting until it can be executed if necessary), false otherwise.
* <p>
* {@link #release()} should be called after the query is done executing. It is the responsibility of the caller to
* ensure that {@link #release()} is called exactly once for each call to this method.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @throws InterruptedException if the current thread is interrupted
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
// If _totalPermits is <= 0, it means that the cluster is not configured to limit the number of multi-stage queries
// that can be executed concurrently. In this case, we should not block the query.
if (_totalPermits > 0) {
return super.tryAcquire(timeout, unit);
} else {
if (_maxConcurrentQueries <= 0) {
return true;
}
return _semaphore.tryAcquire(timeout, unit);
}

@Override
/**
* Should be called after the query is done executing. It is the responsibility of the caller to ensure that this
* method is called exactly once for each call to {@link #tryAcquire(long, TimeUnit)}.
*/
public void release() {
if (_totalPermits > 0) {
super.release();
if (_maxConcurrentQueries > 0) {
_semaphore.release();
}
}

@Override
public void init(HelixManager helixManager) {
_helixManager = helixManager;
_helixAdmin = _helixManager.getClusterManagmentTool();
_helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_helixManager.getClusterName()).build();
}

@Override
public void processClusterChange(HelixConstants.ChangeType changeType) {
Preconditions.checkArgument(
changeType == HelixConstants.ChangeType.EXTERNAL_VIEW || changeType == HelixConstants.ChangeType.CLUSTER_CONFIG,
"MultiStageQuerySemaphore can only handle EXTERNAL_VIEW and CLUSTER_CONFIG changes");

if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
int numBrokers =
Math.max(1, (int) _helixAdmin
.getInstancesInCluster(_helixManager.getClusterName())
.stream()
.filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE))
.count());

if (numBrokers != _numBrokers) {
List<String> clusterInstances = _helixAdmin.getInstancesInCluster(_helixManager.getClusterName());
int numBrokers = Math.max(1, (int) clusterInstances.stream()
.filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE))
.count());
int numServers = Math.max(1, (int) clusterInstances.stream()
.filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))
.count());

if (numBrokers != _numBrokers || numServers != _numServers) {
_numBrokers = numBrokers;
_numServers = numServers;
if (_maxConcurrentQueries > 0) {
int newTotalPermits = Math.max(1, _maxConcurrentQueries / _numBrokers);
if (newTotalPermits > _totalPermits) {
release(newTotalPermits - _totalPermits);
} else if (newTotalPermits < _totalPermits) {
reducePermits(_totalPermits - newTotalPermits);
}
_totalPermits = newTotalPermits;
_semaphore.setPermits(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers));
}
}
} else {
Expand All @@ -160,15 +155,14 @@ public void processClusterChange(HelixConstants.ChangeType changeType) {
}

if (maxConcurrentQueries > 0) {
int newTotalPermits = Math.max(1, maxConcurrentQueries / _numBrokers);
if (newTotalPermits > _totalPermits) {
release(newTotalPermits - _totalPermits);
} else if (newTotalPermits < _totalPermits) {
reducePermits(_totalPermits - newTotalPermits);
}
_totalPermits = newTotalPermits;
_semaphore.setPermits(Math.max(1, maxConcurrentQueries * _numServers / _numBrokers));
}
_maxConcurrentQueries = maxConcurrentQueries;
}
}

@VisibleForTesting
int availablePermits() {
return _semaphore.availablePermits();
}
}
Loading

0 comments on commit ea16de2

Please sign in to comment.