Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#12117 Support for Server & Controller API to check for Segments reload of a table in servers #13789

Merged
merged 45 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
03ab3be
#12117 Support for Server API to check for Segments reload
deepthi912 Aug 9, 2024
557eb73
checkstyle exception
deepthi912 Aug 9, 2024
ab901a3
line characters checkstyle
deepthi912 Aug 9, 2024
4e4a961
segments reload check including indexes metadata
deepthi912 Aug 12, 2024
e3ea6a8
checkstyle unused import removal
deepthi912 Aug 12, 2024
de79260
remove unused code to get the columns
deepthi912 Aug 13, 2024
36beb11
refactoring and review comments fixes
deepthi912 Aug 13, 2024
0ee036f
checkstyle changes for the condition
deepthi912 Aug 13, 2024
c8ba979
checkstyle exception for import
deepthi912 Aug 13, 2024
e1c7c64
change the flag name
deepthi912 Aug 13, 2024
b9e4fbf
review comments on variable names and response
deepthi912 Aug 14, 2024
eef4de5
test to check for reload segment when adding index
deepthi912 Aug 14, 2024
9b904c6
Remove unimplemented method and make private
deepthi912 Aug 14, 2024
38992b1
Change error responses
deepthi912 Aug 14, 2024
d10b8b2
remove unused imports
deepthi912 Aug 14, 2024
12656ec
Remove serverInstanceId from the response
deepthi912 Aug 14, 2024
b987f0e
Fix the test
deepthi912 Aug 14, 2024
c6aa9ca
Add server instance id back
deepthi912 Aug 15, 2024
8848fa6
review comments fixes
deepthi912 Aug 16, 2024
048eb50
review comments fixes
deepthi912 Aug 16, 2024
e439b54
remove unnecessary code here
deepthi912 Aug 16, 2024
028449c
add license header to the class
deepthi912 Aug 16, 2024
5c50cca
Add Controller Side API and test in integration
deepthi912 Aug 16, 2024
8366580
Refactoring and checkstyle fixes
deepthi912 Aug 17, 2024
25ab04c
import issue
deepthi912 Aug 17, 2024
760450f
unused import removal
deepthi912 Aug 17, 2024
ff0f1a1
checkstyle fixes
deepthi912 Aug 17, 2024
a5880c2
checkstyle exception fix
deepthi912 Aug 17, 2024
a0006c9
Refactoring changes and test failures fixes
deepthi912 Aug 17, 2024
9bac29a
Merge pull request #20 from apache/master
deepthi912 Aug 19, 2024
84bb84b
Code Readability and serializer tests
deepthi912 Aug 22, 2024
5491662
checkstyle fixes
deepthi912 Aug 22, 2024
8173c1c
checkstyle fixes
deepthi912 Aug 22, 2024
3649c78
remove unused method
deepthi912 Aug 22, 2024
a14af52
add json properties
deepthi912 Aug 22, 2024
ad11da4
Review Comments Fixes
deepthi912 Aug 26, 2024
468b36d
Naming fixes
deepthi912 Aug 26, 2024
d1b330a
Naming changes
deepthi912 Aug 26, 2024
ff056b0
Update to Boolean variable
deepthi912 Aug 28, 2024
33a2408
Changes in names
deepthi912 Aug 28, 2024
3d7a0ee
Merge branch 'Reload_Segment_Check_12117' of https://github.com/deept…
deepthi912 Aug 28, 2024
5f555e7
checkstyle fixes
deepthi912 Aug 28, 2024
efa1766
change the format of the comment
deepthi912 Aug 28, 2024
00d2220
test case fix
deepthi912 Aug 28, 2024
aa3a722
test case random failures
deepthi912 Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.restlet.resources;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;


/**
* This class gives the data of a server if there exists any segments that need to be reloaded
*
* It has details of server id and returns true/false if there are any segments to be reloaded or not.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class ServerSegmentsReloadCheckResponse {
@JsonProperty("needReload")
private final boolean _needReload;

@JsonProperty("instanceId")
private final String _instanceId;

public boolean isNeedReload() {
return _needReload;
}

public String getInstanceId() {
return _instanceId;
}

@JsonCreator
public ServerSegmentsReloadCheckResponse(@JsonProperty("needReload") boolean needReload,
@JsonProperty("instanceId") String instanceId) {
_needReload = needReload;
_instanceId = instanceId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.restlet.resources;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Map;


/**
* This class gives list of the details from each server if there exists any segments that need to be reloaded
*
* It has details of reload flag which returns true if reload is needed on table and additional details of the
* respective servers.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class TableSegmentsReloadCheckResponse {
@JsonProperty("needReload")
boolean _needReload;
@JsonProperty("serverToSegmentsCheckReloadList")
Map<String, ServerSegmentsReloadCheckResponse> _serverToSegmentsCheckReloadList;

public boolean isNeedReload() {
return _needReload;
}

public Map<String, ServerSegmentsReloadCheckResponse> getServerToSegmentsCheckReloadList() {
return _serverToSegmentsCheckReloadList;
}

@JsonCreator
public TableSegmentsReloadCheckResponse(@JsonProperty("needReload") boolean needReload,
@JsonProperty("serverToSegmentsCheckReloadList")
Map<String, ServerSegmentsReloadCheckResponse> serverToSegmentsCheckReloadList) {
_needReload = needReload;
_serverToSegmentsCheckReloadList = serverToSegmentsCheckReloadList;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse;
import org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse;
import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;


/**
* Tests some of the serializer and deserialization responses from SegmentsReloadCheckResponse class
* needReload will have to be carefully evaluated
*/
public class SegmentsReloadCheckResponseTest {

@Test
public void testSerialization()
throws IOException {
// Given
boolean needReload = true;
String instanceId = "instance123";
ServerSegmentsReloadCheckResponse response = new ServerSegmentsReloadCheckResponse(needReload, instanceId);
Map<String, ServerSegmentsReloadCheckResponse> serversResponse = new HashMap<>();
serversResponse.put(instanceId, response);
TableSegmentsReloadCheckResponse tableResponse = new TableSegmentsReloadCheckResponse(needReload, serversResponse);
String responseString = JsonUtils.objectToPrettyString(response);
String tableResponseString = JsonUtils.objectToPrettyString(tableResponse);

assertNotNull(responseString);
assertNotNull(tableResponseString);
JsonNode tableResponseJsonNode = JsonUtils.stringToJsonNode(tableResponseString);
assertTrue(tableResponseJsonNode.get("needReload").asBoolean());

JsonNode serversList = tableResponseJsonNode.get("serverToSegmentsCheckReloadList");
JsonNode serverResp = serversList.get("instance123");
assertEquals(serverResp.get("instanceId").asText(), "instance123");
assertTrue(serverResp.get("needReload").asBoolean());

assertEquals("{\n" + " \"needReload\" : true,\n" + " \"serverToSegmentsCheckReloadList\" : {\n"
+ " \"instance123\" : {\n" + " \"needReload\" : true,\n" + " \"instanceId\" : \"instance123\"\n"
+ " }\n" + " }\n" + "}", tableResponseString);
assertEquals("{\n" + " \"needReload\" : true,\n" + " \"instanceId\" : \"instance123\"\n" + "}", responseString);
}

@Test
public void testDeserialization()
throws Exception {
String jsonResponse = "{\n" + " \"needReload\": false,\n" + " \"serverToSegmentsCheckReloadList\": {\n"
+ " \"Server_10.0.0.215_7050\": {\n" + " \"needReload\": false,\n"
+ " \"instanceId\": \"Server_10.0.0.215_7050\"\n" + " }\n" + " }\n" + "}";
JsonNode jsonNode = JsonUtils.stringToJsonNode(jsonResponse);
TableSegmentsReloadCheckResponse tableReloadResponse =
JsonUtils.stringToObject(jsonResponse, new TypeReference<TableSegmentsReloadCheckResponse>() {
});
// Then
assertNotNull(jsonNode);
assertFalse(tableReloadResponse.isNeedReload());
assertNotNull(tableReloadResponse.getServerToSegmentsCheckReloadList());
Map<String, ServerSegmentsReloadCheckResponse> serverSegmentReloadResp =
tableReloadResponse.getServerToSegmentsCheckReloadList();
assertEquals(serverSegmentReloadResp.get("Server_10.0.0.215_7050").isNeedReload(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckResponse;
import org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
Expand Down Expand Up @@ -822,6 +824,47 @@ public String getServerMetadata(
return segmentsMetadata;
}

@GET
@Path("segments/{tableNameWithType}/needReload")
@Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", action = Actions.Table.GET_METADATA)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Gets the metadata of reload segments check from servers hosting the table", notes =
"Returns true if reload is needed on the table from any one of the servers")
public String getTableReloadMetadata(
@ApiParam(value = "Table name with type", required = true, example = "myTable_REALTIME")
@PathParam("tableNameWithType") String tableNameWithType,
@QueryParam("verbose") @DefaultValue("false") boolean verbose, @Context HttpHeaders headers) {
tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers);
LOGGER.info("Received a request to check reload for all servers hosting segments for table {}", tableNameWithType);
try {
TableMetadataReader tableMetadataReader =
new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager);
Map<String, JsonNode> needReloadMetadata =
tableMetadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
boolean needReload =
needReloadMetadata.values().stream().anyMatch(value -> value.get("needReload").booleanValue());
Map<String, ServerSegmentsReloadCheckResponse> serverResponses = new HashMap<>();
TableSegmentsReloadCheckResponse tableNeedReloadResponse;
if (verbose) {
for (Map.Entry<String, JsonNode> entry : needReloadMetadata.entrySet()) {
serverResponses.put(entry.getKey(),
new ServerSegmentsReloadCheckResponse(entry.getValue().get("needReload").booleanValue(),
entry.getValue().get("instanceId").asText()));
}
tableNeedReloadResponse = new TableSegmentsReloadCheckResponse(needReload, serverResponses);
} else {
tableNeedReloadResponse = new TableSegmentsReloadCheckResponse(needReload, serverResponses);
}
return JsonUtils.objectToPrettyString(tableNeedReloadResponse);
} catch (InvalidConfigException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Status.BAD_REQUEST);
} catch (IOException ioe) {
throw new ControllerApplicationException(LOGGER, "Error parsing Pinot server response: " + ioe.getMessage(),
Status.INTERNAL_SERVER_ERROR, ioe);
}
}

@GET
@Path("segments/{tableName}/zkmetadata")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_METADATA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ public String reloadTable(String tableName, TableType tableType, boolean forceDo
}
}

public String checkIfReloadIsNeeded(String tableNameWithType, Boolean verbose)
throws IOException {
try {
SimpleHttpResponse simpleHttpResponse = HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(
new URI(_controllerRequestURLBuilder.forTableNeedReload(tableNameWithType, verbose)), _headers, null));
return simpleHttpResponse.getResponse();
} catch (HttpErrorStatusException | URISyntaxException e) {
throw new IOException(e);
}
}

public void reloadSegment(String tableName, String segmentName, boolean forceReload)
throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,43 @@ public List<String> getSegmentMetadataFromServer(String tableNameWithType,
return segmentsMetadata;
}

/**
* This method is called when the API request is to fetch data about segment reload of the table.
* This method makes a MultiGet call to all servers that host their respective segments and gets the results.
* This method will return metadata of all the servers along with need reload flag.
* In future additional details like segments list can also be added
*/
public List<String> getCheckReloadSegmentsFromServer(String tableNameWithType, Set<String> serverInstances,
BiMap<String, String> endpoints, int timeoutMs) {
LOGGER.debug("Checking if reload is needed on segments from servers for table {}.", tableNameWithType);
List<String> serverURLs = new ArrayList<>();
for (String serverInstance : serverInstances) {
serverURLs.add(generateCheckReloadSegmentsServerURL(tableNameWithType, endpoints.get(serverInstance)));
}
BiMap<String, String> endpointsToServers = endpoints.inverse();
CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers);
CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiGetRequest(serverURLs, tableNameWithType, true, timeoutMs);
List<String> serversNeedReloadResponses = new ArrayList<>();

int failedParses = 0;
for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
try {
serversNeedReloadResponses.add(streamResponse.getValue());
} catch (Exception e) {
failedParses++;
LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e);
}
}
if (failedParses != 0) {
LOGGER.error("Unable to parse server {} / {} response due to an error: ", failedParses, serverURLs.size());
}

LOGGER.debug("Retrieved metadata of reload check from servers.");
return serversNeedReloadResponses;
}

/**
* This method is called when the API request is to fetch validDocId metadata for a list segments of the given table.
* This method will pick one server randomly that hosts the target segment and fetch the segment metadata result.
Expand Down Expand Up @@ -375,6 +412,11 @@ private String generateSegmentMetadataServerURL(String tableNameWithType, String
return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint, tableNameWithType, segmentName, paramsStr);
}

private String generateCheckReloadSegmentsServerURL(String tableNameWithType, String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
return String.format("%s/tables/%s/segments/needReload", endpoint, tableNameWithType);
}

@Deprecated
private String generateValidDocIdsURL(String tableNameWithType, String segmentName, String validDocIdsType,
String endpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -33,7 +34,11 @@
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -44,6 +49,7 @@
* the column indexes available.
*/
public class TableMetadataReader {
private static final Logger LOGGER = LoggerFactory.getLogger(TableMetadataReader.class);
deepthi912 marked this conversation as resolved.
Show resolved Hide resolved
private final Executor _executor;
private final HttpClientConnectionManager _connectionManager;
private final PinotHelixResourceManager _pinotHelixResourceManager;
Expand All @@ -55,6 +61,25 @@ public TableMetadataReader(Executor executor, HttpClientConnectionManager connec
_pinotHelixResourceManager = helixResourceManager;
}

public Map<String, JsonNode> getServerCheckSegmentsReloadMetadata(String tableNameWithType, int timeoutMs)
throws InvalidConfigException, IOException {
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
List<String> serverInstances = _pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType, tableType);
Set<String> serverInstanceSet = new HashSet<>(serverInstances);
BiMap<String, String> endpoints = _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverInstanceSet);
ServerSegmentMetadataReader serverSegmentMetadataReader =
new ServerSegmentMetadataReader(_executor, _connectionManager);
List<String> segmentsMetadata =
serverSegmentMetadataReader.getCheckReloadSegmentsFromServer(tableNameWithType, serverInstanceSet, endpoints,
timeoutMs);
Map<String, JsonNode> response = new HashMap<>();
for (String segmentMetadata : segmentsMetadata) {
JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
response.put(responseJson.get("instanceId").asText(), responseJson);
}
return response;
}

/**
* This api takes in list of segments for which we need the metadata.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,11 @@ public String reloadOfflineTable(String tableName, boolean forceDownload)
return getControllerRequestClient().reloadTable(tableName, TableType.OFFLINE, forceDownload);
}

public String checkIfReloadIsNeeded(String tableNameWithType, Boolean verbose)
throws IOException {
return getControllerRequestClient().checkIfReloadIsNeeded(tableNameWithType, verbose);
}

public void reloadOfflineSegment(String tableName, String segmentName, boolean forceDownload)
throws IOException {
getControllerRequestClient().reloadSegment(tableName, segmentName, forceDownload);
Expand Down
Loading
Loading