Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up BrokerRequestHandler and BrokerResponse #13179

Merged
merged 1 commit into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.inject.Inject;
Expand All @@ -60,6 +61,7 @@
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
Expand Down Expand Up @@ -295,6 +297,7 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql, HttpHeaders httpHeaders, boolean forceUseMultiStage)
throws Exception {
long requestArrivalTimeMs = System.currentTimeMillis();
SqlNodeAndOptions sqlNodeAndOptions;
try {
sqlNodeAndOptions = RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(), sqlRequestJson);
Expand All @@ -311,9 +314,10 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
}
switch (sqlType) {
case DQL:
try (RequestScope requestStatistics = Tracing.getTracer().createRequestScope()) {
return _requestHandler.handleRequest(sqlRequestJson, sqlNodeAndOptions, httpRequesterIdentity,
requestStatistics, httpHeaders);
try (RequestScope requestContext = Tracing.getTracer().createRequestScope()) {
requestContext.setRequestArrivalTimeMillis(requestArrivalTimeMs);
return _requestHandler.handleRequest(sqlRequestJson, sqlNodeAndOptions, httpRequesterIdentity, requestContext,
httpHeaders);
} catch (Exception e) {
LOGGER.error("Error handling DQL request:\n{}\nException: {}", sqlRequestJson,
QueryException.getTruncatedStackTrace(e));
Expand Down Expand Up @@ -361,10 +365,10 @@ private static HttpRequesterIdentity makeHttpIdentity(org.glassfish.grizzly.http
static Response getPinotQueryResponse(BrokerResponse brokerResponse)
throws Exception {
int queryErrorCodeHeaderValue = -1; // default value of the header.

if (brokerResponse.getExceptionsSize() != 0) {
List<QueryProcessingException> exceptions = brokerResponse.getExceptions();
if (!exceptions.isEmpty()) {
// set the header value as first exception error code value.
queryErrorCodeHeaderValue = brokerResponse.getProcessingExceptions().get(0).getErrorCode();
queryErrorCodeHeaderValue = exceptions.get(0).getErrorCode();
}

// returning the Response with OK status and header value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BaseSingleStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate;
import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
Expand Down Expand Up @@ -72,8 +73,7 @@
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.services.ServiceRole;
Expand Down Expand Up @@ -128,7 +128,6 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected HelixManager _participantHelixManager;
// Handles the server routing stats.
protected ServerRoutingStatsManager _serverRoutingStatsManager;
protected BrokerQueryEventListener _brokerQueryEventListener;

@Override
public void init(PinotConfiguration brokerConf)
Expand All @@ -139,8 +138,8 @@ public void init(PinotConfiguration brokerConf)
_clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME);
ServiceStartableUtils.applyClusterConfig(_brokerConf, _zkServers, _clusterName, ServiceRole.BROKER);

PinotInsecureMode.setPinotInInsecureMode(
Boolean.valueOf(_brokerConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
PinotInsecureMode.setPinotInInsecureMode(Boolean.valueOf(
_brokerConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
CommonConstants.DEFAULT_PINOT_INSECURE_MODE)));

if (_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
Expand Down Expand Up @@ -277,8 +276,7 @@ public void start()
final PinotConfiguration factoryConf = _brokerConf.subset(Broker.ACCESS_CONTROL_CONFIG_PREFIX);
// Adding cluster name to the config so that it can be used by the AccessControlFactory
factoryConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, _brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME));
_accessControlFactory =
AccessControlFactory.loadFactory(factoryConf, _propertyStore);
_accessControlFactory = AccessControlFactory.loadFactory(factoryConf, _propertyStore);
HelixExternalViewBasedQueryQuotaManager queryQuotaManager =
new HelixExternalViewBasedQueryQuotaManager(_brokerMetrics, _instanceId);
queryQuotaManager.init(_spectatorHelixManager);
Expand All @@ -292,49 +290,42 @@ public void start()
boolean caseInsensitive =
_brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
TableCache tableCache = new TableCache(_propertyStore, caseInsensitive);
// Configure TLS for netty connection to server
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);

LOGGER.info("Initializing Broker Event Listener Factory");
_brokerQueryEventListener = PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener(
_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));
BrokerQueryEventListenerFactory.init(_brokerConf.subset(Broker.EVENT_LISTENER_CONFIG_PREFIX));

// Create Broker request handler.
String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
String brokerRequestHandlerType =
_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
BrokerRequestHandler singleStageBrokerRequestHandler = null;
BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler;
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
singleStageBrokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, _brokerMetrics, null, _brokerQueryEventListener);
} else { // default request handler type, e.g. netty
tableCache);
} else {
// Default request handler type, i.e. netty
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
// Configure TLS for netty connection to server
TlsConfig tlsDefaults = null;
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager,
_brokerQueryEventListener);
} else {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager,
_brokerQueryEventListener);
tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
}
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
}

BrokerRequestHandler multiStageBrokerRequestHandler = null;
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, _brokerQueryEventListener);
queryQuotaManager, tableCache);
}

_brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId, singleStageBrokerRequestHandler,
multiStageBrokerRequestHandler, _brokerMetrics);
_brokerRequestHandler =
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler);
_brokerRequestHandler.start();

// Enable/disable thread CPU time measurement through instance config.
Expand All @@ -345,8 +336,8 @@ public void start()
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
_brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
Tracing.ThreadAccountantOps
.initializeThreadAccountant(_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);
Tracing.ThreadAccountantOps.initializeThreadAccountant(
_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId);

String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
if (controllerUrl != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BaseSingleStageBrokerRequestHandler.ServerStats;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -112,35 +111,27 @@ public double getLogRateLimit() {
}

private boolean shouldForceLog(QueryLogParams params) {
return params._response.isNumGroupsLimitReached() || params._response.getExceptionsSize() > 0
|| params._timeUsedMs > TimeUnit.SECONDS.toMillis(1);
return params._response.isPartialResult() || params._response.getTimeUsedMs() > TimeUnit.SECONDS.toMillis(1);
}

public static class QueryLogParams {
final long _requestId;
final String _query;
final RequestContext _requestContext;
final String _table;
final int _numUnavailableSegments;
@Nullable
final BaseBrokerRequestHandler.ServerStats _serverStats;
final ServerStats _serverStats;
final BrokerResponse _response;
final long _timeUsedMs;
@Nullable
final RequesterIdentity _requester;

public QueryLogParams(long requestId, String query, RequestContext requestContext, String table,
int numUnavailableSegments, @Nullable BaseBrokerRequestHandler.ServerStats serverStats, BrokerResponse response,
long timeUsedMs, @Nullable RequesterIdentity requester) {
_requestId = requestId;
public QueryLogParams(String query, String table, int numUnavailableSegments, @Nullable ServerStats serverStats,
BrokerResponse response, @Nullable RequesterIdentity requester) {
_query = query;
_table = table;
_timeUsedMs = timeUsedMs;
_requestContext = requestContext;
_requester = requester;
_response = response;
_serverStats = serverStats;
_numUnavailableSegments = numUnavailableSegments;
_serverStats = serverStats;
_response = response;
_requester = requester;
}
}

Expand All @@ -152,7 +143,7 @@ private enum QueryLogEntry {
REQUEST_ID("requestId") {
@Override
void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
builder.append(params._requestId);
builder.append(params._response.getRequestId());
}
},
TABLE("table") {
Expand All @@ -164,7 +155,7 @@ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params)
TIME_MS("timeMs") {
@Override
void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
builder.append(params._timeUsedMs);
builder.append(params._response.getTimeUsedMs());
}
},
DOCS("docs") {
Expand Down Expand Up @@ -215,7 +206,7 @@ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params)
BROKER_REDUCE_TIME_MS("brokerReduceTimeMs") {
@Override
void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
builder.append(params._requestContext.getReduceTimeMillis());
builder.append(params._response.getBrokerReduceTimeMs());
}
},
EXCEPTIONS("exceptions") {
Expand Down
Loading
Loading