From 8f59a8d58f79fe5a0f8f93ebd210b3dca169abb1 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Wed, 15 May 2024 12:14:39 +0530 Subject: [PATCH 1/9] [Remote Store] Add segment transfer timeout dynamic setting Signed-off-by: Varun Bansal --- .../common/settings/ClusterSettings.java | 1 + .../opensearch/index/shard/IndexShard.java | 3 ++- .../shard/RemoteStoreRefreshListener.java | 14 ++++++++-- .../indices/RemoteStoreSettings.java | 26 +++++++++++++++++++ .../RemoteStoreRefreshListenerTests.java | 22 +++++++++++++--- 5 files changed, 60 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 4a5a45eb1a17a..33c0a498fc449 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -737,6 +737,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING, + RemoteStoreSettings.CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 18d4a2ca6d639..42634d2129faa 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3970,7 +3970,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro new RemoteStoreRefreshListener( this, this.checkpointPublisher, - remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId()) + remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId()), + remoteStoreSettings ) ); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 351aec6e3af6c..8a3a4f4564050 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -33,6 +33,7 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.threadpool.ThreadPool; @@ -45,6 +46,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -89,11 +91,13 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh private volatile long primaryTerm; private volatile Iterator backoffDelayIterator; private final SegmentReplicationCheckpointPublisher checkpointPublisher; + private final RemoteStoreSettings remoteStoreSettings; public RemoteStoreRefreshListener( IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher, - RemoteSegmentTransferTracker segmentTracker + RemoteSegmentTransferTracker segmentTracker, + RemoteStoreSettings remoteStoreSettings ) { super(indexShard.getThreadPool()); logger = Loggers.getLogger(getClass(), indexShard.shardId()); @@ -116,6 +120,7 @@ public RemoteStoreRefreshListener( this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); this.checkpointPublisher = checkpointPublisher; + this.remoteStoreSettings = remoteStoreSettings; } @Override @@ -286,7 +291,12 @@ public void onFailure(Exception e) { // Start the segments files upload uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener); - latch.await(); + if (latch.await( + remoteStoreSettings.getClusterRemoteSegmentTransferTimeout().millis(), + TimeUnit.MILLISECONDS + ) == false) { + throw new InterruptedException("Timeout while waiting for remote segment transfer to complete"); + } } catch (EngineException e) { logger.warn("Exception while reading SegmentInfosSnapshot", e); } diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 0bd4c7aedfc03..1365dde986687 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -105,9 +105,21 @@ public class RemoteStoreSettings { Property.NodeScope ); + /** + * Controls timeout value while uploading segment files to remote segment store + */ + public static final Setting CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.segment.transfer_timeout", + TimeValue.timeValueHours(3), + TimeValue.timeValueMinutes(10), + Property.NodeScope, + Property.Dynamic + ); + private volatile TimeValue clusterRemoteTranslogBufferInterval; private volatile int minRemoteSegmentMetadataFiles; private volatile TimeValue clusterRemoteTranslogTransferTimeout; + private volatile TimeValue clusterRemoteSegmentTransferTimeout; private volatile RemoteStoreEnums.PathType pathType; private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm; private volatile int maxRemoteTranslogReaders; @@ -139,6 +151,12 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { maxRemoteTranslogReaders = CLUSTER_REMOTE_MAX_TRANSLOG_READERS.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_TRANSLOG_READERS, this::setMaxRemoteTranslogReaders); + + clusterRemoteSegmentTransferTimeout = CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING, + this::setClusterRemoteSegmentTransferTimeout + ); } public TimeValue getClusterRemoteTranslogBufferInterval() { @@ -161,10 +179,18 @@ public TimeValue getClusterRemoteTranslogTransferTimeout() { return clusterRemoteTranslogTransferTimeout; } + public TimeValue getClusterRemoteSegmentTransferTimeout() { + return clusterRemoteSegmentTransferTimeout; + } + private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) { this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout; } + private void setClusterRemoteSegmentTransferTimeout(TimeValue clusterRemoteSegmentTransferTimeout) { + this.clusterRemoteSegmentTransferTimeout = clusterRemoteSegmentTransferTimeout; + } + @ExperimentalApi public RemoteStoreEnums.PathType getPathType() { return pathType; diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 33f6c67b94b3d..b9fe4a530d25e 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -34,6 +34,7 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils; import org.opensearch.index.store.Store; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; +import org.opensearch.indices.DefaultRemoteStoreSettings; import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; @@ -90,7 +91,12 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY); remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard); RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); - remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY, tracker); + remoteStoreRefreshListener = new RemoteStoreRefreshListener( + indexShard, + SegmentReplicationCheckpointPublisher.EMPTY, + tracker, + DefaultRemoteStoreSettings.INSTANCE + ); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @@ -175,7 +181,12 @@ public void testRemoteDirectoryInitThrowsException() throws IOException { when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); // Since the thrown IOException is caught in the constructor, ctor should be invoked successfully. - new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY, mock(RemoteSegmentTransferTracker.class)); + new RemoteStoreRefreshListener( + shard, + SegmentReplicationCheckpointPublisher.EMPTY, + mock(RemoteSegmentTransferTracker.class), + DefaultRemoteStoreSettings.INSTANCE + ); // Validate that the stream of metadata file of remoteMetadataDirectory has been opened only once and the // listFilesByPrefixInLexicographicOrder has been called twice. @@ -638,7 +649,12 @@ private Tuple mockIn RemoteStoreSettings remoteStoreSettings = mock(RemoteStoreSettings.class); when(remoteStoreSettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10); when(shard.getRemoteStoreSettings()).thenReturn(remoteStoreSettings); - RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker); + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener( + shard, + emptyCheckpointPublisher, + tracker, + DefaultRemoteStoreSettings.INSTANCE + ); refreshListener.afterRefresh(true); return Tuple.tuple(refreshListener, remoteStoreStatsTrackerFactory); } From d15ded5728069bf6d1d1599a174b1686a4ca43bf Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Mon, 20 May 2024 12:41:19 +0530 Subject: [PATCH 2/9] introduce a new exception for segment upload failures and handle interrupted latch.await Signed-off-by: Varun Bansal --- .../shard/RemoteStoreRefreshListener.java | 2 +- .../shard/SegmentUploadFailedException.java | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 3a411b2cec189..20afd7b2f3568 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -295,7 +295,7 @@ public void onFailure(Exception e) { remoteStoreSettings.getClusterRemoteSegmentTransferTimeout().millis(), TimeUnit.MILLISECONDS ) == false) { - throw new InterruptedException("Timeout while waiting for remote segment transfer to complete"); + throw new SegmentUploadFailedException("Timeout while waiting for remote segment transfer to complete"); } } catch (EngineException e) { logger.warn("Exception while reading SegmentInfosSnapshot", e); diff --git a/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java b/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java new file mode 100644 index 0000000000000..ecd7313ad0db6 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import java.io.IOException; + +public class SegmentUploadFailedException extends IOException { + + public SegmentUploadFailedException(String message) { + super(message); + } +} From 16eb7d2e3e3b6813944e15ebe7232fda01d5cc69 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Mon, 20 May 2024 23:22:26 +0530 Subject: [PATCH 3/9] add integ tests to cover segment upload timeouts Signed-off-by: Varun Bansal --- .../RemoteStoreRefreshListenerIT.java | 1 - ...RemoteStoreRefreshListenerMultipartIT.java | 98 +++++++++++++++++++ .../mocks/MockFsAsyncBlobContainer.java | 77 ++++++++------- .../opensearch/client/ClusterAdminClient.java | 2 + .../client/support/AbstractClient.java | 5 + .../indices/RemoteStoreSettings.java | 2 +- 6 files changed, 146 insertions(+), 39 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerMultipartIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index 65016c4976157..7ae08bf968ade 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -26,7 +26,6 @@ import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; -import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerMultipartIT.java new file mode 100644 index 0000000000000..305e61d4ad2b9 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerMultipartIT.java @@ -0,0 +1,98 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequest; +import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreRefreshListenerMultipartIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { + + protected Collection> nodePlugins() { + return Arrays.asList(MockFsRepositoryPlugin.class); + } + + @Override + public Settings buildRemoteStoreNodeAttributes(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) { + Settings settings = super.buildRemoteStoreNodeAttributes(repoLocation, ioFailureRate, skipExceptionBlobList, maxFailure); + String segmentRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + REPOSITORY_NAME + ); + String translogRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + TRANSLOG_REPOSITORY_NAME + ); + + String stateRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + REPOSITORY_NAME + ); + + return Settings.builder() + .put(settings) + .put(segmentRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE) + .put(translogRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE) + .put(stateRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE) + .build(); + } + + public void testRemoteRefreshSegmentUploadTimeout() throws Exception { + Path location = randomRepoPath().toAbsolutePath(); + setup(location, randomDoubleBetween(0.1, 0.15, true), "metadata", 10L); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), false)) + .setPersistentSettings( + Settings.builder() + .put(RemoteStoreSettings.CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(1)) + ) + .get(); + + // Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed + // due to IOExceptions that are thrown while doing uploadBlobs. + indexData(randomIntBetween(5, 10), randomBoolean()); + logger.info("--> Indexed data"); + logger.info("--> Verify that the segment upload fails"); + try { + assertBusy(() -> { + RemoteStoreStatsResponse remoteStoreStatsResponse = client().admin() + .cluster() + .remoteStoreStats(new RemoteStoreStatsRequest()) + .get(); + Arrays.asList(remoteStoreStatsResponse.getRemoteStoreStats()).forEach(remoteStoreStats -> { + assertTrue(remoteStoreStats.getSegmentStats().totalUploadsFailed > 10); + }); + }, 10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + cleanupRepo(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index d45b4e3deb798..106aa0c0ef42d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -76,46 +76,49 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp }); thread.start(); } - try { - if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName()); - } - } catch (InterruptedException e) { - throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName()); - } - try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { - outputStream.write(buffer); - } - if (writeContext.getFileSize() != totalContentRead.get()) { - throw new IOException( - "Incorrect content length read for file " - + writeContext.getFileName() - + ", actual file size: " - + writeContext.getFileSize() - + ", bytes read: " - + totalContentRead.get() - ); - } - try { - // bulks need to succeed for segment files to be generated - if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) { - completionListener.onFailure( - new RuntimeException( - new CorruptIndexException( - "Data integrity check failure for file: " + writeContext.getFileName(), - writeContext.getFileName() + Thread thread = new Thread(() -> { + try { + try { + if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName()); + } + } catch (InterruptedException e) { + throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName()); + } + try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { + outputStream.write(buffer); + } + if (writeContext.getFileSize() != totalContentRead.get()) { + throw new IOException( + "Incorrect content length read for file " + + writeContext.getFileName() + + ", actual file size: " + + writeContext.getFileSize() + + ", bytes read: " + + totalContentRead.get() + ); + } + + // bulks need to succeed for segment files to be generated + if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) { + completionListener.onFailure( + new RuntimeException( + new CorruptIndexException( + "Data integrity check failure for file: " + writeContext.getFileName(), + writeContext.getFileName() + ) ) - ) - ); - } else { - writeContext.getUploadFinalizer().accept(true); - completionListener.onResponse(null); + ); + } else { + writeContext.getUploadFinalizer().accept(true); + completionListener.onResponse(null); + } + } catch (Exception e) { + completionListener.onFailure(e); } - } catch (Exception e) { - completionListener.onFailure(e); - } - + }); + thread.start(); } @Override diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index 05f09c1a6e661..c6ed8b92ccc0b 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -322,6 +322,8 @@ public interface ClusterAdminClient extends OpenSearchClient { void remoteStoreStats(RemoteStoreStatsRequest request, ActionListener listener); + ActionFuture remoteStoreStats(RemoteStoreStatsRequest request); + RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId); /** diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 6c6049f04231b..44716abb447f2 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -922,6 +922,11 @@ public void remoteStoreStats(final RemoteStoreStatsRequest request, final Action execute(RemoteStoreStatsAction.INSTANCE, request, listener); } + @Override + public ActionFuture remoteStoreStats(final RemoteStoreStatsRequest request) { + return execute(RemoteStoreStatsAction.INSTANCE, request); + } + @Override public RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId) { RemoteStoreStatsRequestBuilder remoteStoreStatsRequestBuilder = new RemoteStoreStatsRequestBuilder( diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 1365dde986687..9cc24927d4265 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -110,7 +110,7 @@ public class RemoteStoreSettings { */ public static final Setting CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting( "cluster.remote_store.segment.transfer_timeout", - TimeValue.timeValueHours(3), + TimeValue.timeValueHours(1), TimeValue.timeValueMinutes(10), Property.NodeScope, Property.Dynamic From b97c712e624984f257453355513f56f7c8aecf92 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Tue, 21 May 2024 02:37:54 +0530 Subject: [PATCH 4/9] remove integ test and add unit tests Signed-off-by: Varun Bansal --- ...RemoteStoreRefreshListenerMultipartIT.java | 98 ------------------- .../mocks/MockFsAsyncBlobContainer.java | 77 +++++++-------- .../opensearch/client/ClusterAdminClient.java | 2 - .../client/support/AbstractClient.java | 5 - .../RemoteStoreRefreshListenerTests.java | 75 ++++++++++++-- 5 files changed, 106 insertions(+), 151 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerMultipartIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerMultipartIT.java deleted file mode 100644 index 305e61d4ad2b9..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerMultipartIT.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.remotestore; - -import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequest; -import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.indices.RemoteStoreSettings; -import org.opensearch.plugins.Plugin; -import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; -import org.opensearch.test.OpenSearchIntegTestCase; - -import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collection; -import java.util.Locale; -import java.util.concurrent.TimeUnit; - -import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteStoreRefreshListenerMultipartIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { - - protected Collection> nodePlugins() { - return Arrays.asList(MockFsRepositoryPlugin.class); - } - - @Override - public Settings buildRemoteStoreNodeAttributes(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) { - Settings settings = super.buildRemoteStoreNodeAttributes(repoLocation, ioFailureRate, skipExceptionBlobList, maxFailure); - String segmentRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - REPOSITORY_NAME - ); - String translogRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - TRANSLOG_REPOSITORY_NAME - ); - - String stateRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - REPOSITORY_NAME - ); - - return Settings.builder() - .put(settings) - .put(segmentRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE) - .put(translogRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE) - .put(stateRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE) - .build(); - } - - public void testRemoteRefreshSegmentUploadTimeout() throws Exception { - Path location = randomRepoPath().toAbsolutePath(); - setup(location, randomDoubleBetween(0.1, 0.15, true), "metadata", 10L); - - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), false)) - .setPersistentSettings( - Settings.builder() - .put(RemoteStoreSettings.CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(1)) - ) - .get(); - - // Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed - // due to IOExceptions that are thrown while doing uploadBlobs. - indexData(randomIntBetween(5, 10), randomBoolean()); - logger.info("--> Indexed data"); - logger.info("--> Verify that the segment upload fails"); - try { - assertBusy(() -> { - RemoteStoreStatsResponse remoteStoreStatsResponse = client().admin() - .cluster() - .remoteStoreStats(new RemoteStoreStatsRequest()) - .get(); - Arrays.asList(remoteStoreStatsResponse.getRemoteStoreStats()).forEach(remoteStoreStats -> { - assertTrue(remoteStoreStats.getSegmentStats().totalUploadsFailed > 10); - }); - }, 10, TimeUnit.SECONDS); - } catch (Exception e) { - throw new RuntimeException(e); - } - cleanupRepo(); - } -} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index 106aa0c0ef42d..d45b4e3deb798 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -76,49 +76,46 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp }); thread.start(); } + try { + if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName()); + } + } catch (InterruptedException e) { + throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName()); + } + try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { + outputStream.write(buffer); + } + if (writeContext.getFileSize() != totalContentRead.get()) { + throw new IOException( + "Incorrect content length read for file " + + writeContext.getFileName() + + ", actual file size: " + + writeContext.getFileSize() + + ", bytes read: " + + totalContentRead.get() + ); + } - Thread thread = new Thread(() -> { - try { - try { - if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName()); - } - } catch (InterruptedException e) { - throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName()); - } - try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { - outputStream.write(buffer); - } - if (writeContext.getFileSize() != totalContentRead.get()) { - throw new IOException( - "Incorrect content length read for file " - + writeContext.getFileName() - + ", actual file size: " - + writeContext.getFileSize() - + ", bytes read: " - + totalContentRead.get() - ); - } - - // bulks need to succeed for segment files to be generated - if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) { - completionListener.onFailure( - new RuntimeException( - new CorruptIndexException( - "Data integrity check failure for file: " + writeContext.getFileName(), - writeContext.getFileName() - ) + try { + // bulks need to succeed for segment files to be generated + if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) { + completionListener.onFailure( + new RuntimeException( + new CorruptIndexException( + "Data integrity check failure for file: " + writeContext.getFileName(), + writeContext.getFileName() ) - ); - } else { - writeContext.getUploadFinalizer().accept(true); - completionListener.onResponse(null); - } - } catch (Exception e) { - completionListener.onFailure(e); + ) + ); + } else { + writeContext.getUploadFinalizer().accept(true); + completionListener.onResponse(null); } - }); - thread.start(); + } catch (Exception e) { + completionListener.onFailure(e); + } + } @Override diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index c6ed8b92ccc0b..05f09c1a6e661 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -322,8 +322,6 @@ public interface ClusterAdminClient extends OpenSearchClient { void remoteStoreStats(RemoteStoreStatsRequest request, ActionListener listener); - ActionFuture remoteStoreStats(RemoteStoreStatsRequest request); - RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId); /** diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 44716abb447f2..6c6049f04231b 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -922,11 +922,6 @@ public void remoteStoreStats(final RemoteStoreStatsRequest request, final Action execute(RemoteStoreStatsAction.INSTANCE, request, listener); } - @Override - public ActionFuture remoteStoreStats(final RemoteStoreStatsRequest request) { - return execute(RemoteStoreStatsAction.INSTANCE, request); - } - @Override public RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId) { RemoteStoreStatsRequestBuilder remoteStoreStatsRequestBuilder = new RemoteStoreStatsRequestBuilder( diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index b9fe4a530d25e..728fdd364818a 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -23,6 +23,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.InternalEngineFactory; @@ -381,6 +382,36 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { assertNoLagAndTotalUploadsFailed(segmentTracker, 1); } + public void testSegmentUploadTimeout() throws Exception { + // This covers 2 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false + // Succeed on 2nd attempt + int succeedOnAttempt = 1; + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. + CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); + // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. + // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down + CountDownLatch successLatch = new CountDownLatch(2); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch, + 1, + new CountDownLatch(0), + true, + true + ); + tuple.v2().getRemoteSegmentTransferTracker(indexShard.shardId).getTotalUploadsFailed(); + assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); + assertBusy(() -> assertEquals(1, successLatch.getCount())); + RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2(); + RemoteSegmentTransferTracker segmentTracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); + assertBusy(() -> { + assertTrue(segmentTracker.getTotalUploadsFailed() > 1); + assertTrue(segmentTracker.getTotalUploadsSucceeded() < 2); + }); + indexShard.getThreadPool().shutdownNow(); + } + /** * Tests retry flow after snapshot and metadata files have been uploaded to remote store in the failed attempt. * Snapshot and metadata files created in failed attempt should not break retry. @@ -480,6 +511,7 @@ public void testRefreshFailedDueToPrimaryTermMisMatch() throws Exception { successLatch, checkpointPublishSucceedOnAttempt, reachedCheckpointPublishLatch, + false, false ); @@ -531,7 +563,8 @@ private Tuple mockIn successLatch, succeedCheckpointPublishOnAttempt, reachedCheckpointPublishLatch, - true + true, + false ); } @@ -541,7 +574,8 @@ private Tuple mockIn CountDownLatch successLatch, int succeedCheckpointPublishOnAttempt, CountDownLatch reachedCheckpointPublishLatch, - boolean mockPrimaryTerm + boolean mockPrimaryTerm, + boolean testUploadTimeout ) throws IOException { // Create index shard that we will be using to mock different methods in IndexShard for the unit test indexShard = newStartedShard( @@ -575,9 +609,22 @@ private Tuple mockIn // Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) Store remoteStore = mock(Store.class); when(shard.remoteStore()).thenReturn(remoteStore); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = - (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()) - .getDelegate(); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; + RemoteDirectory remoteDirectory = mock(RemoteDirectory.class); + ; + if (testUploadTimeout) { + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDirectory, + mock(RemoteDirectory.class), + mock(RemoteStoreLockManager.class), + indexShard.getThreadPool(), + indexShard.shardId + ); + } else { + remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore() + .directory()).getDelegate()).getDelegate(); + } + FilterDirectory remoteStoreFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(remoteSegmentStoreDirectory)); when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); @@ -649,11 +696,27 @@ private Tuple mockIn RemoteStoreSettings remoteStoreSettings = mock(RemoteStoreSettings.class); when(remoteStoreSettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10); when(shard.getRemoteStoreSettings()).thenReturn(remoteStoreSettings); + if (testUploadTimeout) { + when(remoteStoreSettings.getClusterRemoteSegmentTransferTimeout()).thenReturn(TimeValue.timeValueMillis(10)); + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(5); + indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> { + try { + Thread.sleep(30000); + } catch (InterruptedException e) { + logger.warn("interrupted sleep"); + } + actionListener.onResponse(null); + }); + return true; + }).when(remoteDirectory).copyFrom(any(), any(), any(), any(), any(), any(ActionListener.class), any(Boolean.class)); + } + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener( shard, emptyCheckpointPublisher, tracker, - DefaultRemoteStoreSettings.INSTANCE + remoteStoreSettings ); refreshListener.afterRefresh(true); return Tuple.tuple(refreshListener, remoteStoreStatsTrackerFactory); From f4546d7fd6390ed6ea3504d3ea7cc3e9bf45663d Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Tue, 21 May 2024 02:46:48 +0530 Subject: [PATCH 5/9] reduce default timeout to 30 minutes Signed-off-by: Varun Bansal --- .../main/java/org/opensearch/indices/RemoteStoreSettings.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 9cc24927d4265..5234a09a58541 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -110,7 +110,7 @@ public class RemoteStoreSettings { */ public static final Setting CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting( "cluster.remote_store.segment.transfer_timeout", - TimeValue.timeValueHours(1), + TimeValue.timeValueMinutes(30), TimeValue.timeValueMinutes(10), Property.NodeScope, Property.Dynamic From cbca4eea6e1a3ae83339c8af865d6a9e6e1111af Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Tue, 21 May 2024 11:00:40 +0530 Subject: [PATCH 6/9] update documentation Signed-off-by: Varun Bansal --- .../index/shard/RemoteStoreRefreshListenerTests.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 18febdaa121ce..217b9828e60bc 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -384,13 +384,10 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { } public void testSegmentUploadTimeout() throws Exception { - // This covers 2 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false - // Succeed on 2nd attempt + // This covers the case were segment upload fails due to timeout int succeedOnAttempt = 1; // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); - // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. - // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(2); Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, @@ -401,7 +398,6 @@ public void testSegmentUploadTimeout() throws Exception { true, true ); - tuple.v2().getRemoteSegmentTransferTracker(indexShard.shardId).getTotalUploadsFailed(); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(1, successLatch.getCount())); RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2(); @@ -410,6 +406,7 @@ public void testSegmentUploadTimeout() throws Exception { assertTrue(segmentTracker.getTotalUploadsFailed() > 1); assertTrue(segmentTracker.getTotalUploadsSucceeded() < 2); }); + // shutdown threadpool for avoid leaking threads indexShard.getThreadPool().shutdownNow(); } @@ -612,7 +609,7 @@ private Tuple mockIn when(shard.remoteStore()).thenReturn(remoteStore); RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; RemoteDirectory remoteDirectory = mock(RemoteDirectory.class); - ; + if (testUploadTimeout) { remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( remoteDirectory, @@ -705,7 +702,7 @@ private Tuple mockIn try { Thread.sleep(30000); } catch (InterruptedException e) { - logger.warn("interrupted sleep"); + logger.warn("copyFrom thread interrupted during sleep"); } actionListener.onResponse(null); }); From 62e73e52120ed3677a73bf310b2aba24d59e0af0 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Tue, 21 May 2024 11:38:02 +0530 Subject: [PATCH 7/9] add java doc for new exception Signed-off-by: Varun Bansal --- .../index/shard/SegmentUploadFailedException.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java b/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java index ecd7313ad0db6..40b151942f01b 100644 --- a/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java +++ b/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java @@ -10,8 +10,16 @@ import java.io.IOException; +/** + * Exception to be thrown when a segment upload fails. + */ public class SegmentUploadFailedException extends IOException { + /** + * Creates a new SegmentUploadFailedException. + * + * @param message error message + */ public SegmentUploadFailedException(String message) { super(message); } From a6417b473e68dcb18d8bd8010fe89d0057ff9199 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Tue, 21 May 2024 14:25:22 +0530 Subject: [PATCH 8/9] spotless apply Signed-off-by: Varun Bansal --- .../opensearch/index/shard/SegmentUploadFailedException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java b/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java index 40b151942f01b..9ab13f650c14b 100644 --- a/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java +++ b/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java @@ -18,7 +18,7 @@ public class SegmentUploadFailedException extends IOException { /** * Creates a new SegmentUploadFailedException. * - * @param message error message + * @param message error message */ public SegmentUploadFailedException(String message) { super(message); From 251ecd08d9f959bd2327a0272d1640644c507468 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Wed, 22 May 2024 21:07:56 +0530 Subject: [PATCH 9/9] address PR comments Signed-off-by: Varun Bansal --- CHANGELOG.md | 1 + .../opensearch/index/shard/SegmentUploadFailedException.java | 2 ++ .../opensearch/index/shard/RemoteStoreRefreshListenerTests.java | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c905def74219..bc2e25c72ce94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423)) - Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478)) - Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293)) +- [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679)) ### Dependencies - Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559)) diff --git a/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java b/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java index 9ab13f650c14b..bbff399fb71ff 100644 --- a/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java +++ b/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java @@ -12,6 +12,8 @@ /** * Exception to be thrown when a segment upload fails. + * + * @opensearch.internal */ public class SegmentUploadFailedException extends IOException { diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 217b9828e60bc..bb0776e0ced25 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -384,7 +384,7 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { } public void testSegmentUploadTimeout() throws Exception { - // This covers the case were segment upload fails due to timeout + // This covers the case where segment upload fails due to timeout int succeedOnAttempt = 1; // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);