-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#12117 Support for Server & Controller API to check for Segments reload of a table in servers #13789
#12117 Support for Server & Controller API to check for Segments reload of a table in servers #13789
Conversation
line characters checkstyle
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13789 +/- ##
============================================
- Coverage 61.75% 57.49% -4.27%
+ Complexity 207 197 -10
============================================
Files 2436 2582 +146
Lines 133233 142544 +9311
Branches 20636 22128 +1492
============================================
- Hits 82274 81949 -325
- Misses 44911 54100 +9189
- Partials 6048 6495 +447
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
boolean mismatchCheck = false; | ||
for (SegmentDataManager segmentDataManager : segmentDataManagers) { | ||
Set<String> segmentColumns = SegmentMetadataFetcher.getSegmentColumns(segmentDataManager, columns); | ||
if (!segmentColumns.containsAll(schemaColumns)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we would need to also check indexing changes. reload will be needed for indexing changes, dictionary changes, along with new column added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to use little detailed check from SegmentPreProcessor.needProcess() to compare the same. But I am doubtful of the amount of time it would take. Do you want me to just compare the indexes map corresponding to each column instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which specific part about needPreprocess are you doubtful about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@npawar We should be good. I was thinking there were some extra config checks happening in needProcess!
segments reload check including indexes metadata
checkstyle unused import removal
remove unused code to get the columns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall direction looks good!
public String checkMismatchedSegments( | ||
@ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName, | ||
@ApiParam(value = "Column name", allowMultiple = true) @QueryParam("columns") @DefaultValue("") | ||
List<String> columns, @Context HttpHeaders headers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is columns
needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed, removed
Pair<TableConfig, Schema> tableConfigSchema = tableDataManager.fetchTableConfigAndSchema(); | ||
IndexLoadingConfig indexLoadingConfig = | ||
tableDataManager.getIndexLoadingConfig(tableConfigSchema.getLeft(), tableConfigSchema.getRight()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pair<TableConfig, Schema> tableConfigSchema = tableDataManager.fetchTableConfigAndSchema(); | |
IndexLoadingConfig indexLoadingConfig = | |
tableDataManager.getIndexLoadingConfig(tableConfigSchema.getLeft(), tableConfigSchema.getRight()); | |
IndexLoadingConfig indexLoadingConfig = tableDataManager.fetchIndexLoadingConfig(); |
boolean mismatchCheck = false; | ||
for (SegmentDataManager segmentDataManager : segmentDataManagers) { | ||
SegmentZKMetadata segmentZKMetadata = tableDataManager.fetchZKMetadata(segmentDataManager.getSegmentName()); | ||
if (tableDataManager.checkReloadSegment(segmentZKMetadata, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest pushing down the logic completely into the TableDataManager
so that it is easier to manage and extend in the future. Basically add an API needReload()
to the TableDataManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@GET | ||
@Path("/tables/{tableName}/segments/mismatch") | ||
@Produces(MediaType.APPLICATION_JSON) | ||
@ApiOperation(value = "Checks if there is any mismatch of columns in a segment", notes = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API path and doc is a little bit misleading. We want to check if reload is needed, instead of checking column mismatch
* | ||
* It has details of server id and returns true/false if there are any segments to be reloaded or not. | ||
*/ | ||
public class SegmentColumnMismatchResponse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest renaming it to SegmentReloadCheckResponse
and rename the fields accordingly
boolean _isMismatch; | ||
String _serverInstanceId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They can be defined as private final
|
||
// If the segment doesn't exist on server or its CRC has changed, then we | ||
// need to fall back to download the segment from deep store to load it. | ||
if (segmentMetadata == null || !hasSameCRC(zkMetadata, segmentMetadata)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not read ZK metadata or do CRC check here because it can drastically increase the ZK load. needPreprocess()
check should be enough
Schema schema = indexLoadingConfig.getSchema(); | ||
//if the reload of the segment is not needed then return false | ||
if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory, indexLoadingConfig, schema)) { | ||
_logger.info("Segment: {} is consistent with latest table config and schema", segmentName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can flood the log (consider thousands of segments on each server)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the log
private final String _serverInstanceId; | ||
|
||
@JsonCreator | ||
public SegmentsReloadCheckResponse(@JsonProperty("isReload") boolean isReload, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: needReload
instead?
@Context HttpHeaders headers) { | ||
tableName = DatabaseUtils.translateTableName(tableName, headers); | ||
TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName); | ||
boolean isSegmentsReload = tableDataManager.needReloadSegments(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename the output to sth like needReload
?
|
||
@JsonCreator | ||
public SegmentsReloadCheckResponse(@JsonProperty("isReload") boolean isReload, | ||
@JsonProperty("serverInstanceId") String serverInstanceId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why the serverInstanceId
needs to be returned as part of the response payload here? Didn't we already know where the request was sent to?
//if re processing or reload is needed on a segment then return true | ||
return ImmutableSegmentLoader.needPreprocess(segmentDirectory, indexLoadingConfig, schema); | ||
} catch (Exception e) { | ||
_logger.error("Failed to check if reload is needed for a segment {}", segmentName, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we emit a server metric to indicate that something is wrong when running the check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that might be overkilling. IMO we can simply throw the exception out, and client can handle it accordingly.
public boolean needReloadSegments() { | ||
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig(); | ||
List<SegmentDataManager> segmentDataManagers = acquireAllSegments(); | ||
boolean segmentsReloadCheck = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename it to sth like needSegmentReload
* Checks for a particular segment, reload is needed or not | ||
* @return true if the reload is needed on the segment | ||
*/ | ||
boolean checkReloadSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unify the method name to sth like needReloadSegment(...)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this API anymore
} catch (Exception e) { | ||
_logger.error("Failed to check if reload is needed for a segment {}", segmentName, e); | ||
closeSegmentDirectoryQuietly(segmentDirectory); | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw you stated in the API description that 500
will be returned if there is any server side error. Shouldn't we return 500
here in this case?
remove unused imports
Remove serverInstanceId from the response
Fix the test
return _serverInstanceId; | ||
} | ||
|
||
public boolean getReload() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method name might not work for JSON serialization. I think it will be mistakenly serialized as reload
. Can you add a test to verify? We might need to rename it to isNeedReload()
, or explicitly annotate it with @JsonProperty("needReload")
. If we explicitly annotate, a better name could be isReloadNeeded()
*/ | ||
public class SegmentsReloadCheckResponse { | ||
private final boolean _needReload; | ||
private final String _serverInstanceId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) instanceId
might be more concise because this is always returned from server.
Do we really need to return it? Controller should already have this info when reading the address of the server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have it, it's just little complex to separate out server instance when doing multi get request which is why just returning it as well in the response to make it simpler. I will look into this later. For now, we can include instanceId as well I think
throws Exception { | ||
String segmentName = zkMetadata.getSegmentName(); | ||
SegmentDirectory segmentDirectory = | ||
tryInitSegmentDirectory(segmentName, String.valueOf(zkMetadata.getCrc()), indexLoadingConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can cause exception for consuming segments. We should only check immutable segments. We can read the SegmentDirectory
out from ImmutableSegmentImpl
. We can consider adding a test in BaseClusterIntegrationTestSet.testReload()
to verify if the API works. You may also add the test after the controller side integration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked for ImmutableSegmentImpl instance before passing it through needReload where SegmentDirectory is generated. I think this should avoid any exception cases of consuming segments. Adding more tests along with controller side code and to combine with the same PR.
boolean needReload = false; | ||
try { | ||
for (SegmentDataManager segmentDataManager : segmentDataManagers) { | ||
SegmentZKMetadata segmentZKMetadata = fetchZKMetadata(segmentDataManager.getSegmentName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not read ZK metadata because it needs to access ZK, and we might end up reading it for thousands of segments.
@@ -1013,6 +1013,22 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading | |||
} | |||
} | |||
|
|||
boolean needReloadSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it private
Schema schema = indexLoadingConfig.getSchema(); | ||
//if re processing or reload is needed on a segment then return true | ||
return ImmutableSegmentLoader.needPreprocess(segmentDirectory, indexLoadingConfig, schema); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably directly throw the exception out without catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closeSegmentDirectoryQuietly(segmentDirectory); There is this closure being handled in exception cases( so wrote a catch block). Do you want me to handle this in finally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can use the SegmentDirectory
within ImmutableSegmentImpl
, we don't need to close it here. Ideally we don't want to create new SegmentDirectory
in this check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Add Controller Side API and test in integration
Refactoring changes and test failures fixes
reverse merge
Code Readability and serializer tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly good
...c/main/java/org/apache/pinot/common/restlet/resources/ServerSegmentsReloadCheckResponse.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/pinot/common/restlet/resources/ServerSegmentsReloadCheckResponse.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/pinot/common/restlet/resources/TableSegmentsReloadCheckResponse.java
Outdated
Show resolved
Hide resolved
pinot-common/src/test/java/org/apache/pinot/common/utils/SerializerResponseTest.java
Outdated
Show resolved
Hide resolved
...ler/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
Outdated
Show resolved
Hide resolved
public Map<String, JsonNode> getServerCheckSegmentsReloadMetadata(String tableName, TableType tableType, | ||
int timeoutMs) | ||
throws InvalidConfigException, IOException { | ||
String tableNameWithType = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check should be performed in the RestletResource
class before calling this method
@@ -721,6 +721,11 @@ public String reloadOfflineTable(String tableName, boolean forceDownload) | |||
return getControllerRequestClient().reloadTable(tableName, TableType.OFFLINE, forceDownload); | |||
} | |||
|
|||
public String needReloadOfflineTable(String tableNameWithType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename it to checkIfReloadIsNeeded
. Does it only apply to offline table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It applies to both realtime and offline tables, changing the name
...rc/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
Outdated
Show resolved
Hide resolved
@@ -239,6 +239,11 @@ public String forTableReload(String tableName, TableType tableType, boolean forc | |||
return StringUtil.join("/", _baseUrl, "segments", tableName, query); | |||
} | |||
|
|||
public String forTableNeedReload(String tableNameWithType) { | |||
String query = String.format("needReload?verbose=%s", "false"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) No need to format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested for false and true verbose flags. Changed this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
Outdated
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Xiaotian (Jackie) Jiang <17555551+Jackie-Jiang@users.noreply.github.com>
…hi912/pinot into Reload_Segment_Check_12117
…s reload of a table in servers (apache#13789)
Context:
#12117
This PR adds two APIs:
Server API
Response for the server API will be something like :
{
"instanceId": "Server_10.0.0.215_7050",
"needReload": false
}
Controller API
2) Controller API which checks if the reload is needed on a table taking into verbose flag consideration:
{
"needReload": false,
"serverToSegmentsReloadList": {
"Server_10.0.0.215_7050": {
"needReload": false,
"instanceId": "Server_10.0.0.215_7050"
}
}
}
{
"needReload": false,
"serverToSegmentsReloadList": {}
}
Extension: This api can add extra details about the list of segments to be reloaded in the response in the future.
Responses from Controller API: