Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Server API to list segments that need to be refreshed for a table #14451

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.segment.local.data.manager.NeedRefreshResponse;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
Expand Down Expand Up @@ -891,6 +892,31 @@ public String getTableReloadMetadata(
}
}

@GET
@Path("segments/{tableNameWithType}/needRefresh")
@Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", action = Actions.Table.GET_METADATA)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Gets a list of segments that need to be refreshed from servers hosting the table", notes =
"Gets a list of segments that need to be refreshed from servers hosting the table")
public Map<String, List<NeedRefreshResponse>> getTableRefreshMetadata(
swaminathanmanish marked this conversation as resolved.
Show resolved Hide resolved
@ApiParam(value = "Table name with type", required = true, example = "myTable_REALTIME")
@PathParam("tableNameWithType") String tableNameWithType, @Context HttpHeaders headers) {
tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers);
LOGGER.info("Received a request to check for segments requiring a refresh from all servers hosting segments for "
+ "table {}", tableNameWithType);
try {
TableMetadataReader tableMetadataReader =
new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager);
return tableMetadataReader.getSegmentsForRefresh(tableNameWithType,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
} catch (InvalidConfigException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Status.BAD_REQUEST);
} catch (IOException ioe) {
throw new ControllerApplicationException(LOGGER, "Error parsing Pinot server response: " + ioe.getMessage(),
Status.INTERNAL_SERVER_ERROR, ioe);
}
}

@GET
@Path("segments/{tableName}/zkmetadata")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_METADATA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.segment.local.data.manager.NeedRefreshResponse;
import org.apache.pinot.spi.utils.JsonUtils;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
Expand Down Expand Up @@ -397,6 +398,39 @@ public ValidDocIdsBitmapResponse getValidDocIdsBitmapFromServer(String tableName
return response;
}

public Map<String, List<NeedRefreshResponse>> getSegmentsForRefreshFromServer(
String tableNameWithType, Set<String> serverInstances, BiMap<String, String> endpoints, int timeoutMs) {
LOGGER.debug("Getting list of segments for refresh from servers for table {}.", tableNameWithType);
List<String> serverURLs = new ArrayList<>();
for (String serverInstance : serverInstances) {
serverURLs.add(generateNeedRefreshSegmentsServerURL(tableNameWithType, endpoints.get(serverInstance)));
}
BiMap<String, String> endpointsToServers = endpoints.inverse();
CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers);
CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiGetRequest(serverURLs, tableNameWithType, true, timeoutMs);
Map<String, List<NeedRefreshResponse>> serverResponses = new HashMap<>();

int failedParses = 0;
for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
try {
// TODO: RV - get the instance name instead of the endpoint
swaminathanmanish marked this conversation as resolved.
Show resolved Hide resolved
serverResponses.put(streamResponse.getKey(),
JsonUtils.stringToObject(streamResponse.getValue(),
new TypeReference<List<NeedRefreshResponse>>() { }));
} catch (Exception e) {
failedParses++;
LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e);
swaminathanmanish marked this conversation as resolved.
Show resolved Hide resolved
}
}
if (failedParses != 0) {
LOGGER.error("Unable to parse server {} / {} response due to an error: ", failedParses, serverURLs.size());
vrajat marked this conversation as resolved.
Show resolved Hide resolved
}

return serverResponses;
}

private String generateAggregateSegmentMetadataServerURL(String tableNameWithType, List<String> columns,
String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
Expand Down Expand Up @@ -470,4 +504,9 @@ private String generateColumnsParam(List<String> columns) {
paramsStr = String.join("&", params);
return paramsStr;
}

private String generateNeedRefreshSegmentsServerURL(String tableNameWithType, String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
return String.format("%s/tables/%s/segments/needRefresh", endpoint, tableNameWithType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.segment.local.data.manager.NeedRefreshResponse;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
Expand Down Expand Up @@ -199,4 +200,17 @@ public JsonNode getAggregateValidDocIdsMetadata(String tableNameWithType, List<S
segmentNames, timeoutMs, validDocIdsType, numSegmentsBatchPerServerRequest);
return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
}

public Map<String, List<NeedRefreshResponse>> getSegmentsForRefresh(String tableNameWithType,
int timeoutMs)
throws InvalidConfigException, IOException {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
List<String> serverInstances = _pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType, tableType);
Set<String> serverInstanceSet = new HashSet<>(serverInstances);
BiMap<String, String> endpoints = _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverInstanceSet);
ServerSegmentMetadataReader serverSegmentMetadataReader =
new ServerSegmentMetadataReader(_executor, _connectionManager);
return serverSegmentMetadataReader.getSegmentsForRefreshFromServer(tableNameWithType, serverInstanceSet, endpoints,
timeoutMs);
}
}
Loading
Loading