Skip to content

Commit

Permalink
[timeseries] Part-4: Complete Support for Multi-Server Queries
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana committed Dec 17, 2024
1 parent 246b98c commit d668f3a
Show file tree
Hide file tree
Showing 14 changed files with 389 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String
if (timeSeriesResponse == null
|| timeSeriesResponse.getStatus().equals(PinotBrokerTimeSeriesResponse.ERROR_STATUS)) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED, 1);
final String errorMessage = timeSeriesResponse == null ? "null time-series response"
: timeSeriesResponse.getError();
// TODO(timeseries): Remove logging for failed queries.
LOGGER.warn("time-series query failed with error: {}", errorMessage);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.common.collect.ImmutableMap;
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -36,12 +36,12 @@
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.executor.QueryExecutor;
Expand Down Expand Up @@ -69,6 +69,7 @@
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesServerPlanVisitor;
import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
import org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.executor.ExecutorServiceUtils;
Expand Down Expand Up @@ -258,45 +259,61 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map
* TODO: This design is at odds with MSE because MSE runs even the leaf stage via OpChainSchedulerService.
* However, both OpChain scheduler and this method use the same ExecutorService.
*/
public void processTimeSeriesQuery(String serializedPlan, Map<String, String> metadata,
public void processTimeSeriesQuery(List<String> serializedPlanFragments, Map<String, String> metadata,
StreamObserver<Worker.TimeSeriesResponse> responseObserver) {
// Define a common way to handle errors.
final Consumer<Throwable> handleErrors = (t) -> {
Map<String, String> errorMetadata = new HashMap<>();
errorMetadata.put(WorkerResponseMetadataKeys.ERROR_TYPE, t.getClass().getSimpleName());
errorMetadata.put(WorkerResponseMetadataKeys.ERROR_MESSAGE, t.getMessage() == null
? "Unknown error: no message" : t.getMessage());
responseObserver.onNext(Worker.TimeSeriesResponse.newBuilder().putAllMetadata(errorMetadata).build());
responseObserver.onCompleted();
final Consumer<Pair<Throwable, String>> handleErrors = (pair) -> {
Throwable t = pair.getLeft();
try {
String planId = pair.getRight();
Map<String, String> errorMetadata = new HashMap<>();
errorMetadata.put(WorkerResponseMetadataKeys.ERROR_TYPE, t.getClass().getSimpleName());
errorMetadata.put(WorkerResponseMetadataKeys.ERROR_MESSAGE, t.getMessage() == null
? "Unknown error: no message" : t.getMessage());
errorMetadata.put(WorkerResponseMetadataKeys.PLAN_ID, planId);
// TODO(timeseries): remove logging for failed queries.
LOGGER.warn("time-series query failed:", t);
responseObserver.onNext(Worker.TimeSeriesResponse.newBuilder().putAllMetadata(errorMetadata).build());
responseObserver.onCompleted();
} catch (Throwable t2) {
LOGGER.warn("Unable to send error to broker. Original error: {}", t.getMessage(), t2);
}
};
try {
final long deadlineMs = extractDeadlineMs(metadata);
Preconditions.checkState(System.currentTimeMillis() < deadlineMs,
"Query timed out before getting processed in server. Remaining time: %s", deadlineMs);
// Deserialize plan, and compile to create a tree of operators.
BaseTimeSeriesPlanNode rootNode = TimeSeriesPlanSerde.deserialize(serializedPlan);
"Query timed out before getting processed in server. Exceeded time by (ms): %s",
System.currentTimeMillis() - deadlineMs);
List<BaseTimeSeriesPlanNode> fragmentRoots = serializedPlanFragments.stream()
.map(TimeSeriesPlanSerde::deserialize).collect(Collectors.toList());
TimeSeriesExecutionContext context = new TimeSeriesExecutionContext(
metadata.get(WorkerRequestMetadataKeys.LANGUAGE), extractTimeBuckets(metadata),
extractPlanToSegmentMap(metadata), deadlineMs, metadata);
BaseTimeSeriesOperator operator = _timeSeriesPhysicalPlanVisitor.compile(rootNode, context);
metadata.get(WorkerRequestMetadataKeys.LANGUAGE), extractTimeBuckets(metadata), deadlineMs, metadata,
extractPlanToSegmentMap(metadata), Collections.emptyMap());
final List<BaseTimeSeriesOperator> fragmentOpChains = fragmentRoots.stream().map(x -> {
return _timeSeriesPhysicalPlanVisitor.compile(x, context);
}).collect(Collectors.toList());
// Run the operator using the same executor service as OpChainSchedulerService
_executorService.submit(() -> {
String currentPlanId = "";
try {
TimeSeriesBlock seriesBlock = operator.nextBlock();
Worker.TimeSeriesResponse response = Worker.TimeSeriesResponse.newBuilder()
.setPayload(ByteString.copyFrom(
PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(seriesBlock).serialize(),
StandardCharsets.UTF_8))
.build();
responseObserver.onNext(response);
for (int index = 0; index < fragmentOpChains.size(); index++) {
currentPlanId = fragmentRoots.get(index).getId();
BaseTimeSeriesOperator fragmentOpChain = fragmentOpChains.get(index);
TimeSeriesBlock seriesBlock = fragmentOpChain.nextBlock();
Worker.TimeSeriesResponse response = Worker.TimeSeriesResponse.newBuilder()
.setPayload(TimeSeriesBlockSerde.serializeTimeSeriesBlock(seriesBlock))
.putAllMetadata(ImmutableMap.of(WorkerResponseMetadataKeys.PLAN_ID, currentPlanId))
.build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
} catch (Throwable t) {
handleErrors.accept(t);
handleErrors.accept(Pair.of(t, currentPlanId));
}
});
} catch (Throwable t) {
LOGGER.error("Error running time-series query", t);
handleErrors.accept(t);
handleErrors.accept(Pair.of(t, ""));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.runtime.timeseries;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import org.apache.pinot.tsdb.planner.TimeSeriesExchangeNode;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;


public class PhysicalTimeSeriesBrokerPlanVisitor {
// Warning: Don't use singleton access pattern, since Quickstarts run in a single JVM and spawn multiple broker/server
public PhysicalTimeSeriesBrokerPlanVisitor() {
}

public void init() {
}

public BaseTimeSeriesOperator compile(BaseTimeSeriesPlanNode rootNode, TimeSeriesExecutionContext context,
Map<String, Integer> numInputServersByExchangeNode) {
// Step-1: Replace time series exchange node with its Physical Plan Node.
rootNode = initExchangeReceivePlanNode(rootNode, context, numInputServersByExchangeNode);
// Step-2: Trigger recursive operator generation
return rootNode.run();
}

public BaseTimeSeriesPlanNode initExchangeReceivePlanNode(BaseTimeSeriesPlanNode planNode,
TimeSeriesExecutionContext context, Map<String, Integer> numInputServersByExchangeNode) {
if (planNode instanceof LeafTimeSeriesPlanNode) {
throw new IllegalStateException("Found leaf time series plan node in broker");
} else if (planNode instanceof TimeSeriesExchangeNode) {
int numInputServers = numInputServersByExchangeNode.get(planNode.getId());
return compileToPhysicalReceiveNode((TimeSeriesExchangeNode) planNode, context, numInputServers);
}
List<BaseTimeSeriesPlanNode> newInputs = new ArrayList<>();
for (int index = 0; index < planNode.getInputs().size(); index++) {
BaseTimeSeriesPlanNode inputNode = planNode.getInputs().get(index);
if (inputNode instanceof TimeSeriesExchangeNode) {
int numInputServers = numInputServersByExchangeNode.get(inputNode.getId());
TimeSeriesExchangeReceivePlanNode exchangeReceivePlanNode = compileToPhysicalReceiveNode(
(TimeSeriesExchangeNode) inputNode, context, numInputServers);
newInputs.add(exchangeReceivePlanNode);
} else {
newInputs.add(initExchangeReceivePlanNode(inputNode, context, numInputServersByExchangeNode));
}
}
return planNode.withInputs(newInputs);
}

TimeSeriesExchangeReceivePlanNode compileToPhysicalReceiveNode(TimeSeriesExchangeNode exchangeNode,
TimeSeriesExecutionContext context, int numServersQueried) {
TimeSeriesExchangeReceivePlanNode exchangeReceivePlanNode = new TimeSeriesExchangeReceivePlanNode(
exchangeNode.getId(), context.getDeadlineMs(), exchangeNode.getAggInfo(), context.getSeriesBuilderFactory());
BlockingQueue<Object> receiver = context.getExchangeReceiverByPlanId().get(exchangeNode.getId());
exchangeReceivePlanNode.init(Objects.requireNonNull(receiver, "No receiver for node"), numServersQueried);
return exchangeReceivePlanNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -56,26 +57,34 @@ public PhysicalTimeSeriesServerPlanVisitor(QueryExecutor queryExecutor, Executor
}

public BaseTimeSeriesOperator compile(BaseTimeSeriesPlanNode rootNode, TimeSeriesExecutionContext context) {
// Step-1: Replace scan filter project with our physical plan node with Pinot Core and Runtime context
initLeafPlanNode(rootNode, context);
// Step-1: Replace leaf node with our physical plan node with Pinot Core and Runtime context
rootNode = initLeafPlanNode(rootNode, context);
// Step-2: Trigger recursive operator generation
return rootNode.run();
}

public void initLeafPlanNode(BaseTimeSeriesPlanNode planNode, TimeSeriesExecutionContext context) {
public BaseTimeSeriesPlanNode initLeafPlanNode(BaseTimeSeriesPlanNode planNode, TimeSeriesExecutionContext context) {
if (planNode instanceof LeafTimeSeriesPlanNode) {
return convertLeafToPhysicalTableScan((LeafTimeSeriesPlanNode) planNode, context);
}
List<BaseTimeSeriesPlanNode> newInputs = new ArrayList<>();
for (int index = 0; index < planNode.getInputs().size(); index++) {
BaseTimeSeriesPlanNode childNode = planNode.getInputs().get(index);
if (childNode instanceof LeafTimeSeriesPlanNode) {
LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) childNode;
List<String> segments = context.getPlanIdToSegmentsMap().get(leafNode.getId());
ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(leafNode, segments, context);
TimeSeriesPhysicalTableScan physicalTableScan = new TimeSeriesPhysicalTableScan(childNode.getId(),
serverQueryRequest, _queryExecutor, _executorService);
planNode.getInputs().set(index, physicalTableScan);
newInputs.add(convertLeafToPhysicalTableScan(leafNode, context));
} else {
initLeafPlanNode(childNode, context);
newInputs.add(initLeafPlanNode(childNode, context));
}
}
return planNode.withInputs(newInputs);
}

private TimeSeriesPhysicalTableScan convertLeafToPhysicalTableScan(LeafTimeSeriesPlanNode leafNode,
TimeSeriesExecutionContext context) {
List<String> segments = context.getPlanIdToSegmentsMap().get(leafNode.getId());
ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(leafNode, segments, context);
return new TimeSeriesPhysicalTableScan(leafNode.getId(), serverQueryRequest, _queryExecutor, _executorService);
}

public ServerQueryRequest compileLeafServerQueryRequest(LeafTimeSeriesPlanNode leafNode, List<String> segments,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,31 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactory;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider;


public class TimeSeriesExecutionContext {
private final String _language;
private final TimeBuckets _initialTimeBuckets;
private final Map<String, List<String>> _planIdToSegmentsMap;
private final Map<String, BlockingQueue<Object>> _exchangeReceiverByPlanId;
private final long _deadlineMs;
private final Map<String, String> _metadataMap;
private final TimeSeriesBuilderFactory _seriesBuilderFactory;

public TimeSeriesExecutionContext(String language, TimeBuckets initialTimeBuckets,
Map<String, List<String>> planIdToSegmentsMap, long deadlineMs, Map<String, String> metadataMap) {
public TimeSeriesExecutionContext(String language, TimeBuckets initialTimeBuckets, long deadlineMs,
Map<String, String> metadataMap, Map<String, List<String>> planIdToSegmentsMap,
Map<String, BlockingQueue<Object>> exchangeReceiverByPlanId) {
_language = language;
_initialTimeBuckets = initialTimeBuckets;
_planIdToSegmentsMap = planIdToSegmentsMap;
_deadlineMs = deadlineMs;
_metadataMap = metadataMap;
_planIdToSegmentsMap = planIdToSegmentsMap;
_exchangeReceiverByPlanId = exchangeReceiverByPlanId;
_seriesBuilderFactory = TimeSeriesBuilderFactoryProvider.getSeriesBuilderFactory(language);
}

public String getLanguage() {
Expand All @@ -47,8 +55,8 @@ public TimeBuckets getInitialTimeBuckets() {
return _initialTimeBuckets;
}

public Map<String, List<String>> getPlanIdToSegmentsMap() {
return _planIdToSegmentsMap;
public long getDeadlineMs() {
return _deadlineMs;
}

public long getRemainingTimeMs() {
Expand All @@ -58,4 +66,16 @@ public long getRemainingTimeMs() {
public Map<String, String> getMetadataMap() {
return _metadataMap;
}

public Map<String, List<String>> getPlanIdToSegmentsMap() {
return _planIdToSegmentsMap;
}

public Map<String, BlockingQueue<Object>> getExchangeReceiverByPlanId() {
return _exchangeReceiverByPlanId;
}

public TimeSeriesBuilderFactory getSeriesBuilderFactory() {
return _seriesBuilderFactory;
}
}
Loading

0 comments on commit d668f3a

Please sign in to comment.