Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#12117 Support for Server & Controller API to check for Segments reload of a table in servers #13789

Merged
merged 45 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
03ab3be
#12117 Support for Server API to check for Segments reload
deepthi912 Aug 9, 2024
557eb73
checkstyle exception
deepthi912 Aug 9, 2024
ab901a3
line characters checkstyle
deepthi912 Aug 9, 2024
4e4a961
segments reload check including indexes metadata
deepthi912 Aug 12, 2024
e3ea6a8
checkstyle unused import removal
deepthi912 Aug 12, 2024
de79260
remove unused code to get the columns
deepthi912 Aug 13, 2024
36beb11
refactoring and review comments fixes
deepthi912 Aug 13, 2024
0ee036f
checkstyle changes for the condition
deepthi912 Aug 13, 2024
c8ba979
checkstyle exception for import
deepthi912 Aug 13, 2024
e1c7c64
change the flag name
deepthi912 Aug 13, 2024
b9e4fbf
review comments on variable names and response
deepthi912 Aug 14, 2024
eef4de5
test to check for reload segment when adding index
deepthi912 Aug 14, 2024
9b904c6
Remove unimplemented method and make private
deepthi912 Aug 14, 2024
38992b1
Change error responses
deepthi912 Aug 14, 2024
d10b8b2
remove unused imports
deepthi912 Aug 14, 2024
12656ec
Remove serverInstanceId from the response
deepthi912 Aug 14, 2024
b987f0e
Fix the test
deepthi912 Aug 14, 2024
c6aa9ca
Add server instance id back
deepthi912 Aug 15, 2024
8848fa6
review comments fixes
deepthi912 Aug 16, 2024
048eb50
review comments fixes
deepthi912 Aug 16, 2024
e439b54
remove unnecessary code here
deepthi912 Aug 16, 2024
028449c
add license header to the class
deepthi912 Aug 16, 2024
5c50cca
Add Controller Side API and test in integration
deepthi912 Aug 16, 2024
8366580
Refactoring and checkstyle fixes
deepthi912 Aug 17, 2024
25ab04c
import issue
deepthi912 Aug 17, 2024
760450f
unused import removal
deepthi912 Aug 17, 2024
ff0f1a1
checkstyle fixes
deepthi912 Aug 17, 2024
a5880c2
checkstyle exception fix
deepthi912 Aug 17, 2024
a0006c9
Refactoring changes and test failures fixes
deepthi912 Aug 17, 2024
9bac29a
Merge pull request #20 from apache/master
deepthi912 Aug 19, 2024
84bb84b
Code Readability and serializer tests
deepthi912 Aug 22, 2024
5491662
checkstyle fixes
deepthi912 Aug 22, 2024
8173c1c
checkstyle fixes
deepthi912 Aug 22, 2024
3649c78
remove unused method
deepthi912 Aug 22, 2024
a14af52
add json properties
deepthi912 Aug 22, 2024
ad11da4
Review Comments Fixes
deepthi912 Aug 26, 2024
468b36d
Naming fixes
deepthi912 Aug 26, 2024
d1b330a
Naming changes
deepthi912 Aug 26, 2024
ff056b0
Update to Boolean variable
deepthi912 Aug 28, 2024
33a2408
Changes in names
deepthi912 Aug 28, 2024
3d7a0ee
Merge branch 'Reload_Segment_Check_12117' of https://github.com/deept…
deepthi912 Aug 28, 2024
5f555e7
checkstyle fixes
deepthi912 Aug 28, 2024
efa1766
change the format of the comment
deepthi912 Aug 28, 2024
00d2220
test case fix
deepthi912 Aug 28, 2024
aa3a722
test case random failures
deepthi912 Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.restlet.resources;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;


/**
* This class gives the data of a server if there exists any segments that need to be reloaded
*
* It has details of server id and returns true/false if there are any segments to be reloaded or not.
*/
public class SegmentsReloadCheckResponse {
private final boolean _needReload;
private final String _serverInstanceId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) instanceId might be more concise because this is always returned from server.

Do we really need to return it? Controller should already have this info when reading the address of the server

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have it, it's just little complex to separate out server instance when doing multi get request which is why just returning it as well in the response to make it simpler. I will look into this later. For now, we can include instanceId as well I think


public String getServerInstanceId() {
return _serverInstanceId;
}

public boolean getReload() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method name might not work for JSON serialization. I think it will be mistakenly serialized as reload. Can you add a test to verify? We might need to rename it to isNeedReload(), or explicitly annotate it with @JsonProperty("needReload"). If we explicitly annotate, a better name could be isReloadNeeded()

return _needReload;
}

@JsonCreator
public SegmentsReloadCheckResponse(@JsonProperty("needReload") boolean needReload,
@JsonProperty("serverInstanceId") String serverInstanceId) {
_needReload = needReload;
_serverInstanceId = serverInstanceId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,22 @@ public boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoading
}
}

boolean needReloadSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it private

throws Exception {
String segmentName = zkMetadata.getSegmentName();
SegmentDirectory segmentDirectory =
tryInitSegmentDirectory(segmentName, String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can cause exception for consuming segments. We should only check immutable segments. We can read the SegmentDirectory out from ImmutableSegmentImpl. We can consider adding a test in BaseClusterIntegrationTestSet.testReload() to verify if the API works. You may also add the test after the controller side integration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked for ImmutableSegmentImpl instance before passing it through needReload where SegmentDirectory is generated. I think this should avoid any exception cases of consuming segments. Adding more tests along with controller side code and to combine with the same PR.

try {
Schema schema = indexLoadingConfig.getSchema();
//if re processing or reload is needed on a segment then return true
return ImmutableSegmentLoader.needPreprocess(segmentDirectory, indexLoadingConfig, schema);
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably directly throw the exception out without catch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closeSegmentDirectoryQuietly(segmentDirectory); There is this closure being handled in exception cases( so wrote a catch block). Do you want me to handle this in finally?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can use the SegmentDirectory within ImmutableSegmentImpl, we don't need to close it here. Ideally we don't want to create new SegmentDirectory in this check

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

_logger.error("Failed to check if reload is needed for a segment {}", segmentName, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we emit a server metric to indicate that something is wrong when running the check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that might be overkilling. IMO we can simply throw the exception out, and client can handle it accordingly.

closeSegmentDirectoryQuietly(segmentDirectory);
throw new Exception(String.format("Failed to perform reload check on the segment %s", segmentName));
}
}

@Nullable
private SegmentDirectory tryInitSegmentDirectory(String segmentName, String segmentCrc,
IndexLoadingConfig indexLoadingConfig) {
Expand All @@ -1024,6 +1040,28 @@ private SegmentDirectory tryInitSegmentDirectory(String segmentName, String segm
}
}

@Override
public boolean needReloadSegments()
throws Exception {
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
boolean needReload = false;
try {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
SegmentZKMetadata segmentZKMetadata = fetchZKMetadata(segmentDataManager.getSegmentName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not read ZK metadata because it needs to access ZK, and we might end up reading it for thousands of segments.

if (needReloadSegment(segmentZKMetadata, indexLoadingConfig)) {
needReload = true;
break;
}
}
} finally {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
releaseSegment(segmentDataManager);
}
}
return needReload;
}

private SegmentDirectory initSegmentDirectory(String segmentName, String segmentCrc,
IndexLoadingConfig indexLoadingConfig)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
Expand Down Expand Up @@ -289,6 +290,30 @@ public void testReloadSegmentAddIndex()
assertTrue(hasInvertedIndex(indexDir, LONG_COLUMN));
}

@Test
public void testNeedReloadSegmentAddIndex()
throws Exception {
File indexDir = createSegment(SegmentVersion.v3, 5);
long crc = getCRC(indexDir);
assertFalse(hasInvertedIndex(indexDir, STRING_COLUMN));
assertFalse(hasInvertedIndex(indexDir, LONG_COLUMN));

// Same CRCs so load the local segment directory directly.
SegmentZKMetadata zkMetadata = mock(SegmentZKMetadata.class);
when(zkMetadata.getCrc()).thenReturn(crc);
when(zkMetadata.getSegmentName()).thenReturn(SEGMENT_NAME);
SegmentMetadata localMetadata = mock(SegmentMetadata.class);
when(localMetadata.getCrc()).thenReturn(Long.toString(crc));
SegmentDirectory segmentDirectory = mock(SegmentDirectory.class);
// Require to add indices.
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
indexLoadingConfig.setInvertedIndexColumns(new HashSet<>(Arrays.asList(STRING_COLUMN, LONG_COLUMN)));
BaseTableDataManager tableDataManager = createTableManager();
assertTrue(tableDataManager.needReloadSegment(zkMetadata, indexLoadingConfig));
tableDataManager.reloadSegment(SEGMENT_NAME, indexLoadingConfig, zkMetadata, localMetadata, null, false);
assertFalse(tableDataManager.needReloadSegment(zkMetadata, indexLoadingConfig));
}

@Test
public void testReloadSegmentForceDownload()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ void addNewOnlineSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexL
*/
boolean tryLoadExistingSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig);

/**
* Check if reload is needed for any of the segments of a table
* @return true if reload is needed for any of the segments and false otherwise
*/
boolean needReloadSegments()
throws Exception;

/**
* Downloads a segment and loads it into the table.
* NOTE: This method is part of the implementation detail of {@link #addOnlineSegment(String)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pinot.common.response.server.TableIndexMetadataResponse;
import org.apache.pinot.common.restlet.resources.ResourceUtils;
import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
import org.apache.pinot.common.restlet.resources.SegmentsReloadCheckResponse;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo;
import org.apache.pinot.common.restlet.resources.TableSegments;
Expand Down Expand Up @@ -952,4 +953,28 @@ public TableSegmentValidationInfo validateTableSegmentState(
}
return new TableSegmentValidationInfo(true, maxEndTimeMs);
}

@GET
@Path("/tables/{tableName}/segments/needReload")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Checks if reload is needed on any segment", notes = "Returns true if reload is required on"
+ " any segment in this server")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success", response = TableSegments.class), @ApiResponse(code = 500,
message = "Internal Server error", response = ErrorInfo.class)
})
public String checkSegmentsReload(
@ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName,
@Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName);
boolean needReload = false;
try {
needReload = tableDataManager.needReloadSegments();
} catch (Exception e) {
throw new WebApplicationException(e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR);
}
return ResourceUtils.convertToJsonString(
new SegmentsReloadCheckResponse(needReload, tableDataManager.getInstanceId()));
}
}
Loading