Skip to content

Commit

Permalink
Fix crc mismatch during deepstore upload retry task (#14506)
Browse files Browse the repository at this point in the history
* Fix crc mismatch during deepstore upload retry task

* Address comments, add new API endpoint for crc

* update crc post segment upload

* Upload v2 API implementation

* address comments
  • Loading branch information
tibrewalpratik17 authored Dec 3, 2024
1 parent d0d4419 commit e5d5bad
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.restlet.resources;

import com.fasterxml.jackson.annotation.JsonProperty;


public class TableLLCSegmentUploadResponse {
private final String _segmentName;
private final long _crc;
private final String _downloadUrl;

public TableLLCSegmentUploadResponse(@JsonProperty("segmentName") String segmentName,
@JsonProperty("crc") Long crc, @JsonProperty("downloadUrl") String downloadUrl) {
_segmentName = segmentName;
_crc = crc;
_downloadUrl = downloadUrl;
}

public String getSegmentName() {
return _segmentName;
}

public Long getCrc() {
return _crc;
}

public String getDownloadUrl() {
return _downloadUrl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.restlet.resources.EndReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.common.utils.http.HttpClientConfig;
import org.apache.pinot.spi.auth.AuthProvider;
Expand Down Expand Up @@ -963,6 +964,32 @@ public String uploadToSegmentStore(String uri)
return downloadUrl;
}

/**
* Used by controllers to send requests to servers: Controller periodic task uses this endpoint to ask servers
* to upload committed llc segment to segment store if missing.
* @param uri The uri to ask servers to upload segment to segment store
* @return {@link TableLLCSegmentUploadResponse} - segment download url, crc, other metadata
* @throws URISyntaxException
* @throws IOException
* @throws HttpErrorStatusException
*/
public TableLLCSegmentUploadResponse uploadLLCToSegmentStore(String uri)
throws URISyntaxException, IOException, HttpErrorStatusException {
ClassicRequestBuilder requestBuilder = ClassicRequestBuilder.post(new URI(uri)).setVersion(HttpVersion.HTTP_1_1);
// sendRequest checks the response status code
SimpleHttpResponse response = HttpClient.wrapAndThrowHttpException(
_httpClient.sendRequest(requestBuilder.build(), HttpClient.DEFAULT_SOCKET_TIMEOUT_MS));
TableLLCSegmentUploadResponse tableLLCSegmentUploadResponse = JsonUtils.stringToObject(response.getResponse(),
TableLLCSegmentUploadResponse.class);
if (tableLLCSegmentUploadResponse.getDownloadUrl() == null
|| tableLLCSegmentUploadResponse.getDownloadUrl().isEmpty()) {
throw new HttpErrorStatusException(
String.format("Returned segment download url is empty after requesting servers to upload by the path: %s",
uri), response.getStatusCode());
}
return tableLLCSegmentUploadResponse;
}

/**
* Send segment uri.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
Expand Down Expand Up @@ -1557,22 +1558,43 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe

// Randomly ask one server to upload
URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
String segmentDownloadUrl =
moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);

// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
try {
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "uploadLLCSegment");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
TableLLCSegmentUploadResponse tableLLCSegmentUploadResponse
= _fileUploadDownloadClient.uploadLLCToSegmentStore(serverUploadRequestUrl);
String segmentDownloadUrl =
moveSegmentFile(rawTableName, segmentName, tableLLCSegmentUploadResponse.getDownloadUrl(), pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// Update ZK crc to that of the server segment crc if unmatched
if (tableLLCSegmentUploadResponse.getCrc() != segmentZKMetadata.getCrc()) {
LOGGER.info("Updating segment {} crc in ZK to be {} from previous {}", segmentName,
tableLLCSegmentUploadResponse.getCrc(), segmentZKMetadata.getCrc());
segmentZKMetadata.setCrc(tableLLCSegmentUploadResponse.getCrc());
}
} catch (Exception e) {
// this is a fallback call for backward compatibility to the original API /upload in pinot-server
// should be deprecated in the long run
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
serverUploadRequestUrl =
String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
String segmentDownloadUrl = moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
}
// TODO: add version check when persist segment ZK metadata
persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
segmentDownloadUrl);
segmentZKMetadata.getDownloadUrl());
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.restlet.resources.TableLLCSegmentUploadResponse;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
Expand Down Expand Up @@ -1045,6 +1046,134 @@ public void testUploadToSegmentStore()
assertNull(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(4), null).getDownloadUrl());
}

/**
* Test cases for fixing LLC segment by uploading to segment store if missing
*/
@Test
public void testUploadToSegmentStoreV2()
throws HttpErrorStatusException, IOException, URISyntaxException {
// mock the behavior for PinotHelixResourceManager
PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
HelixManager helixManager = mock(HelixManager.class);
HelixAdmin helixAdmin = mock(HelixAdmin.class);
ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore =
(ZkHelixPropertyStore<ZNRecord>) mock(ZkHelixPropertyStore.class);
when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(helixManager);
when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME);
when(pinotHelixResourceManager.getPropertyStore()).thenReturn(zkHelixPropertyStore);

// init fake PinotLLCRealtimeSegmentManager
ControllerConf controllerConfig = new ControllerConf();
controllerConfig.setProperty(ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT,
true);
controllerConfig.setDataDir(TEMP_DIR.toString());
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, controllerConfig);
Assert.assertTrue(segmentManager.isDeepStoreLLCSegmentUploadRetryEnabled());

// Set up a new table with 2 replicas, 5 instances, 5 partition.
setUpNewTable(segmentManager, 2, 5, 5);
SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig =
new SegmentsValidationAndRetentionConfig();
segmentsValidationAndRetentionConfig.setRetentionTimeUnit(TimeUnit.DAYS.toString());
segmentsValidationAndRetentionConfig.setRetentionTimeValue("3");
segmentManager._tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig);
List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(segmentManager._segmentZKMetadataMap.values());
Assert.assertEquals(segmentsZKMetadata.size(), 5);

// Set up external view for this table
ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME);
when(helixAdmin.getResourceExternalView(CLUSTER_NAME, REALTIME_TABLE_NAME)).thenReturn(externalView);

// Change 1st segment status to be DONE, but with default peer download url.
// Verify later the download url is fixed after upload success.
segmentsZKMetadata.get(0).setStatus(Status.DONE);
segmentsZKMetadata.get(0).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
// set up the external view for 1st segment
String instance0 = "instance0";
int adminPort = 2077;
externalView.setState(segmentsZKMetadata.get(0).getSegmentName(), instance0, "ONLINE");
InstanceConfig instanceConfig0 = new InstanceConfig(instance0);
instanceConfig0.setHostName(instance0);
instanceConfig0.getRecord().setIntField(Instance.ADMIN_PORT_KEY, adminPort);
when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance0)).thenReturn(instanceConfig0);
// mock the request/response for 1st segment upload
String serverUploadRequestUrl0 =
String.format("http://%s:%d/segments/%s/%s/uploadLLCSegment?uploadTimeoutMs=-1", instance0, adminPort,
REALTIME_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName());
// tempSegmentFileLocation is the location where the segment uploader will upload the segment. This usually ends
// with a random UUID
File tempSegmentFileLocation = new File(TEMP_DIR, segmentsZKMetadata.get(0).getSegmentName() + UUID.randomUUID());
FileUtils.write(tempSegmentFileLocation, "test");
// After the deep-store retry task gets the segment location returned by Pinot server, it will move the segment to
// its final location. This is the expected segment location.
String expectedSegmentLocation =
segmentManager.createSegmentPath(RAW_TABLE_NAME, segmentsZKMetadata.get(0).getSegmentName()).toString();
when(segmentManager._mockedFileUploadDownloadClient.uploadLLCToSegmentStore(serverUploadRequestUrl0)).thenReturn(
new TableLLCSegmentUploadResponse(segmentsZKMetadata.get(0).getSegmentName(), 12345678L,
tempSegmentFileLocation.getPath()));

// Change 2nd segment status to be DONE, but with default peer download url.
// Verify later the download url isn't fixed after upload failure.
segmentsZKMetadata.get(1).setStatus(Status.DONE);
segmentsZKMetadata.get(1).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
// set up the external view for 2nd segment
String instance1 = "instance1";
externalView.setState(segmentsZKMetadata.get(1).getSegmentName(), instance1, "ONLINE");
InstanceConfig instanceConfig1 = new InstanceConfig(instance1);
instanceConfig1.setHostName(instance1);
instanceConfig1.getRecord().setIntField(Instance.ADMIN_PORT_KEY, adminPort);
when(helixAdmin.getInstanceConfig(CLUSTER_NAME, instance1)).thenReturn(instanceConfig1);
// mock the request/response for 2nd segment upload
String serverUploadRequestUrl1 =
String.format("http://%s:%d/segments/%s/%s/uploadLLCSegment?uploadTimeoutMs=-1", instance1, adminPort,
REALTIME_TABLE_NAME, segmentsZKMetadata.get(1).getSegmentName());
when(segmentManager._mockedFileUploadDownloadClient.uploadLLCToSegmentStore(serverUploadRequestUrl1)).thenThrow(
new HttpErrorStatusException("failed to upload segment",
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()));

// Change 3rd segment status to be DONE, but with default peer download url.
// Verify later the download url isn't fixed because no ONLINE replica found in any server.
segmentsZKMetadata.get(2).setStatus(Status.DONE);
segmentsZKMetadata.get(2).setDownloadUrl(METADATA_URI_FOR_PEER_DOWNLOAD);
// set up the external view for 3rd segment
String instance2 = "instance2";
externalView.setState(segmentsZKMetadata.get(2).getSegmentName(), instance2, "OFFLINE");

// Change 4th segment status to be DONE and with segment download url.
// Verify later the download url is still the same.
String defaultDownloadUrl = "canItBeDownloaded";
segmentsZKMetadata.get(3).setStatus(Status.DONE);
segmentsZKMetadata.get(3).setDownloadUrl(defaultDownloadUrl);

// Keep 5th segment status as IN_PROGRESS.

List<String> segmentNames =
segmentsZKMetadata.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList());
when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(segmentManager._tableConfig);

// Verify the result
segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, segmentsZKMetadata);

// Block until all tasks have been able to complete
TestUtils.waitForCondition(aVoid -> segmentManager.deepStoreUploadExecutorPendingSegmentsIsEmpty(), 30_000L,
"Timed out waiting for upload retry tasks to finish");

assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getDownloadUrl(),
expectedSegmentLocation);
assertFalse(tempSegmentFileLocation.exists(),
"Deep-store retry task should move the file from temp location to permanent location");

assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(1), null).getDownloadUrl(),
METADATA_URI_FOR_PEER_DOWNLOAD);
assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(2), null).getDownloadUrl(),
METADATA_URI_FOR_PEER_DOWNLOAD);
assertEquals(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(3), null).getDownloadUrl(),
defaultDownloadUrl);
assertNull(segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(4), null).getDownloadUrl());
}

@Test
public void testDeleteTmpSegmentFiles()
throws Exception {
Expand Down
Loading

0 comments on commit e5d5bad

Please sign in to comment.