Skip to content

Commit

Permalink
Batch reload api to specify what segments to be reloaded on what serv…
Browse files Browse the repository at this point in the history
…ers to be more flexible (#14544)

* extend existing reload all segments API to make it more flexible, by taking a map to reload different batch of segments on different instances
  • Loading branch information
klsince authored Dec 5, 2024
1 parent 64a952f commit 95b075b
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
*/
package org.apache.pinot.controller.api.resources;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.BiMap;
Expand All @@ -39,6 +40,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -87,6 +89,7 @@
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
Expand Down Expand Up @@ -403,8 +406,8 @@ public SuccessResponse reloadSegment(
int numReloadMsgSent = msgInfo.getLeft();
if (numReloadMsgSent > 0) {
try {
if (_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
startTimeMs, numReloadMsgSent)) {
if (_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, targetInstance,
msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
zkJobMetaWriteSuccess = true;
} else {
LOGGER.error("Failed to add reload segment job meta into zookeeper for table: {}, segment: {}",
Expand Down Expand Up @@ -533,34 +536,28 @@ public ServerReloadControllerJobStatusResponse getReloadJobStatus(
}

String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
Map<String, List<String>> serverToSegments;

String singleSegmentName =
String segmentNames =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
if (singleSegmentName != null) {
// No need to query servers where this segment is not supposed to be hosted
serverToSegments = new TreeMap<>();
List<String> segmentList = Collections.singletonList(singleSegmentName);
_pinotHelixResourceManager.getServers(tableNameWithType, singleSegmentName).forEach(server -> {
serverToSegments.put(server, segmentList);
});
} else {
serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
}
String instanceName =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
Map<String, List<String>> serverToSegments = getServerToSegments(tableNameWithType, segmentNames, instanceName);

BiMap<String, String> serverEndPoints =
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);

List<String> serverUrls = new ArrayList<>();
BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
for (String endpoint : endpointsToServers.keySet()) {
for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
String server = entry.getKey();
String endpoint = entry.getValue();
String reloadTaskStatusEndpoint =
endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + "?reloadJobTimestamp="
+ controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
if (singleSegmentName != null) {
reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" + singleSegmentName;
if (segmentNames != null) {
List<String> targetSegments = serverToSegments.get(server);
reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" + StringUtils.join(targetSegments,
SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
}
serverUrls.add(reloadTaskStatusEndpoint);
}
Expand Down Expand Up @@ -615,6 +612,31 @@ public ServerReloadControllerJobStatusResponse getReloadJobStatus(
return serverReloadControllerJobStatusResponse;
}

@VisibleForTesting
Map<String, List<String>> getServerToSegments(String tableNameWithType, @Nullable String segmentNames,
@Nullable String instanceName) {
if (segmentNames == null) {
// instanceName can be null or not null, and this method below can handle both cases.
return _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType, instanceName);
}
// Skip servers and segments not involved in the segment reloading job.
List<String> segmnetNameList = new ArrayList<>();
Collections.addAll(segmnetNameList, StringUtils.split(segmentNames, SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
if (instanceName != null) {
return Map.of(instanceName, segmnetNameList);
}
// If instance is null, then either one or all segments are being reloaded via current segment reload restful APIs.
// And the if-check at the beginning of this method has handled the case of reloading all segments. So here we
// expect only one segment name.
Preconditions.checkState(segmnetNameList.size() == 1, "Only one segment is expected but got: %s", segmnetNameList);
Map<String, List<String>> serverToSegments = new HashMap<>();
Set<String> servers = _pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
for (String server : servers) {
serverToSegments.put(server, Collections.singletonList(segmentNames));
}
return serverToSegments;
}

@POST
@Path("segments/{tableName}/reload")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.RELOAD_SEGMENT)
Expand All @@ -627,10 +649,11 @@ public SuccessResponse reloadAllSegments(
@ApiParam(value = "Whether to force server to download segment") @QueryParam("forceDownload")
@DefaultValue("false") boolean forceDownload,
@ApiParam(value = "Name of the target instance to reload") @QueryParam("targetInstance") @Nullable
String targetInstance, @Context HttpHeaders headers)
throws JsonProcessingException {
String targetInstance,
@ApiParam(value = "Map from instances to segments to reload. This param takes precedence over targetInstance")
@QueryParam("instanceToSegmentsMap") @Nullable String instanceToSegmentsMapInJson, @Context HttpHeaders headers)
throws IOException {
tableName = DatabaseUtils.translateTableName(tableName, headers);
long startTimeMs = System.currentTimeMillis();
TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName);
TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
// When rawTableName is provided but w/o table type, Pinot tries to reload both OFFLINE
Expand All @@ -644,6 +667,20 @@ public SuccessResponse reloadAllSegments(
List<String> tableNamesWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
LOGGER);
if (instanceToSegmentsMapInJson != null) {
Map<String, List<String>> instanceToSegmentsMap =
JsonUtils.stringToObject(instanceToSegmentsMapInJson, new TypeReference<>() {
});
Map<String, Map<String, Map<String, String>>> tableInstanceMsgData =
reloadSegments(tableNamesWithType, forceDownload, instanceToSegmentsMap);
if (tableInstanceMsgData.isEmpty()) {
throw new ControllerApplicationException(LOGGER,
String.format("Failed to find any segments in table: %s with instanceToSegmentsMap: %s", tableName,
instanceToSegmentsMap), Status.NOT_FOUND);
}
return new SuccessResponse(JsonUtils.objectToString(tableInstanceMsgData));
}
long startTimeMs = System.currentTimeMillis();
Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
for (String tableNameWithType : tableNamesWithType) {
Pair<Integer, String> msgInfo =
Expand All @@ -658,8 +695,8 @@ public SuccessResponse reloadAllSegments(
perTableMsgData.put(tableNameWithType, tableReloadMeta);
// Store in ZK
try {
if (_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, msgInfo.getRight(), startTimeMs,
numReloadMsgSent)) {
if (_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, targetInstance, msgInfo.getRight(),
startTimeMs, numReloadMsgSent)) {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
} else {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
Expand All @@ -678,6 +715,48 @@ public SuccessResponse reloadAllSegments(
return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
}

private Map<String, Map<String, Map<String, String>>> reloadSegments(List<String> tableNamesWithType,
boolean forceDownload, Map<String, List<String>> instanceToSegmentsMap) {
long startTimeMs = System.currentTimeMillis();
Map<String, Map<String, Map<String, String>>> tableInstanceMsgData = new LinkedHashMap<>();
for (String tableNameWithType : tableNamesWithType) {
Map<String, Pair<Integer, String>> instanceMsgInfoMap =
_pinotHelixResourceManager.reloadSegments(tableNameWithType, forceDownload, instanceToSegmentsMap);
Map<String, Map<String, String>> instanceMsgData =
tableInstanceMsgData.computeIfAbsent(tableNameWithType, t -> new HashMap<>());
for (Map.Entry<String, Pair<Integer, String>> instanceMsgInfo : instanceMsgInfoMap.entrySet()) {
String instance = instanceMsgInfo.getKey();
Pair<Integer, String> msgInfo = instanceMsgInfo.getValue();
int numReloadMsgSent = msgInfo.getLeft();
if (numReloadMsgSent <= 0) {
continue;
}
Map<String, String> tableReloadMeta = new HashMap<>();
tableReloadMeta.put("numMessagesSent", String.valueOf(numReloadMsgSent));
tableReloadMeta.put("reloadJobId", msgInfo.getRight());
instanceMsgData.put(instance, tableReloadMeta);
// Store in ZK
try {
String segmentNames =
StringUtils.join(instanceToSegmentsMap.get(instance), SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
if (_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentNames, instance,
msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
} else {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
LOGGER.error("Failed to add batch reload job meta into zookeeper for table: {} targeted instance: {}",
tableNameWithType, instance);
}
} catch (Exception e) {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
LOGGER.error("Failed to add batch reload job meta into zookeeper for table: {} targeted instance: {}",
tableNameWithType, instance, e);
}
}
}
return tableInstanceMsgData;
}

@DELETE
@Produces(MediaType.APPLICATION_JSON)
@Path("/segments/{tableName}/{segmentName}")
Expand Down
Loading

0 comments on commit 95b075b

Please sign in to comment.