Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Remote Publication] Add remote download stats (opensearch-project#15291
Browse files Browse the repository at this point in the history
)

---------
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>

(cherry picked from commit b54e867)
Arpit-Bandejiya committed Sep 2, 2024
1 parent 5653ed6 commit 38d0a82
Showing 15 changed files with 416 additions and 122 deletions.
8 changes: 5 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -35,14 +35,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861))
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.16.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861), [#15205](https://github.com/opensearch-project/OpenSearch/pull/15205))
- OpenJDK Update (July 2024 Patch releases) ([#14998](https://github.com/opensearch-project/OpenSearch/pull/14998))
- Bump `com.microsoft.azure:msal4j` from 1.16.1 to 1.17.0 ([#14995](https://github.com/opensearch-project/OpenSearch/pull/14995), [#15420](https://github.com/opensearch-project/OpenSearch/pull/15420))
- Bump `actions/github-script` from 6 to 7 ([#14997](https://github.com/opensearch-project/OpenSearch/pull/14997))
- Bump `org.tukaani:xz` from 1.9 to 1.10 ([#15110](https://github.com/opensearch-project/OpenSearch/pull/15110))
- Bump `actions/setup-java` from 1 to 4 ([#15104](https://github.com/opensearch-project/OpenSearch/pull/15104))
- Bump `org.apache.avro:avro` from 1.11.3 to 1.12.0 in /plugins/repository-hdfs ([#15119](https://github.com/opensearch-project/OpenSearch/pull/15119))
- Bump `org.bouncycastle:bcpg-fips` from 1.0.7.1 to 2.0.9 ([#15103](https://github.com/opensearch-project/OpenSearch/pull/15103), [#15299](https://github.com/opensearch-project/OpenSearch/pull/15299))
- Bump `com.azure:azure-core` from 1.49.1 to 1.51.0 ([#15111](https://github.com/opensearch-project/OpenSearch/pull/15111))
@@ -81,13 +83,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix delete index template failed when the index template matches a data stream but is unused ([#15080](https://github.com/opensearch-project/OpenSearch/pull/15080))
- Fix array_index_out_of_bounds_exception when indexing documents with field name containing only dot ([#15126](https://github.com/opensearch-project/OpenSearch/pull/15126))
- Fixed array field name omission in flat_object function for nested JSON ([#13620](https://github.com/opensearch-project/OpenSearch/pull/13620))
- Fix range aggregation optimization ignoring top level queries ([#15194](https://github.com/opensearch-project/OpenSearch/pull/15194))
- Fix incorrect parameter names in MinHash token filter configuration handling ([#15233](https://github.com/opensearch-project/OpenSearch/pull/15233))
- Fix range aggregation optimization ignoring top level queries ([#15287](https://github.com/opensearch-project/OpenSearch/pull/15287))
- Fix indexing error when flat_object field is explicitly null ([#15375](https://github.com/opensearch-project/OpenSearch/pull/15375))
- Fix split response processor not included in allowlist ([#15393](https://github.com/opensearch-project/OpenSearch/pull/15393))
- Fix unchecked cast in dynamic action map getter ([#15394](https://github.com/opensearch-project/OpenSearch/pull/15394))
- Fix null values indexed as "null" strings in flat_object field ([#14069](https://github.com/opensearch-project/OpenSearch/pull/14069))

### Security

[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.16...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.15...2.x
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@@ -40,6 +41,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
@@ -253,11 +255,13 @@ private void verifyIndexRoutingFilesDeletion(
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(discoveryStats.getClusterStateStats());
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
}
}
}

Original file line number Diff line number Diff line change
@@ -8,12 +8,15 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
@@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
}

public void testRemotePublicationDownloadStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);

NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);

}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);

assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0);
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Original file line number Diff line number Diff line change
@@ -902,6 +902,10 @@ public DiscoveryStats stats() {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getFullDownloadStats());
stats.add(publicationHandler.getDiffDownloadStats());
}
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
}
Original file line number Diff line number Diff line change
@@ -178,6 +178,14 @@ public PublishClusterStateStats stats() {
);
}

public PersistedStateStats getFullDownloadStats() {
return remoteClusterStateService.getFullDownloadStats();
}

public PersistedStateStats getDiffDownloadStats() {
return remoteClusterStateService.getDiffDownloadStats();
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
ClusterState incomingState;
@@ -231,69 +239,78 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}

// package private for testing
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
}
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
request.getClusterUUID(),
request.getManifestFile()
);
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.trace(() -> "There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug(() -> "Last cluster state not compatible with the diff");
applyFullState = true;
}

if (applyFullState == true) {
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
request.getClusterName(),
manifest,
transportService.getLocalNode().getId(),
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getDiffManifest().getFromStateUUID(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
manifest,
lastSeen,
transportService.getLocalNode().getId()
try {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
}
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
request.getClusterUUID(),
request.getManifestFile()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.debug(() -> "There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug(() -> "Last cluster state not compatible with the diff");
applyFullState = true;
}

if (applyFullState == true) {
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
request.getClusterName(),
manifest,
transportService.getLocalNode().getId(),
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getDiffManifest().getFromStateUUID(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
manifest,
lastSeen,
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
} catch (Exception e) {
if (applyFullState) {
remoteClusterStateService.fullDownloadFailed();
} else {
remoteClusterStateService.diffDownloadFailed();
}
throw e;
}
}

Original file line number Diff line number Diff line change
@@ -747,7 +747,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(

@Override
public PersistedStateStats getStats() {
return remoteClusterStateService.getStats();
return remoteClusterStateService.getUploadStats();
}

private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) {
Original file line number Diff line number Diff line change
@@ -81,7 +81,7 @@ public RemoteClusterStateCleanupManager(
RemoteRoutingTableService remoteRoutingTableService
) {
this.remoteClusterStateService = remoteClusterStateService;
this.remoteStateStats = remoteClusterStateService.getStats();
this.remoteStateStats = remoteClusterStateService.getRemoteStateStats();
ClusterSettings clusterSettings = clusterService.getClusterSettings();
this.clusterApplierService = clusterService.getClusterApplierService();
this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING);
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.coordination.PersistedStateStats;
import org.opensearch.cluster.metadata.DiffableStringMap;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
@@ -258,8 +259,8 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
);

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateSucceeded();
remoteStateStats.stateTook(durationMillis);
remoteStateStats.stateUploadSucceeded();
remoteStateStats.stateUploadTook(durationMillis);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
"writing cluster state took [{}ms] which is above the warn threshold of [{}]; "
@@ -450,8 +451,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
);

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateSucceeded();
remoteStateStats.stateTook(durationMillis);
remoteStateStats.stateUploadSucceeded();
remoteStateStats.stateUploadTook(durationMillis);
ParameterizedMessage clusterStateUploadTimeMessage = new ParameterizedMessage(
CLUSTER_STATE_UPLOAD_TIME_LOG_STRING,
manifestDetails.getClusterMetadataManifest().getStateVersion(),
@@ -1315,8 +1316,10 @@ public ClusterState getClusterStateForManifest(
String localNodeId,
boolean includeEphemeral
) throws IOException {
final ClusterState clusterState;
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (manifest.onOrAfterCodecVersion(CODEC_V2)) {
return readClusterStateInParallel(
clusterState = readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
manifest.getClusterUUID(),
@@ -1336,7 +1339,7 @@ public ClusterState getClusterStateForManifest(
includeEphemeral
);
} else {
ClusterState clusterState = readClusterStateInParallel(
ClusterState state = readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
manifest.getClusterUUID(),
@@ -1357,15 +1360,20 @@ public ClusterState getClusterStateForManifest(
false
);
Metadata.Builder mb = Metadata.builder(remoteGlobalMetadataManager.getGlobalMetadata(manifest.getClusterUUID(), manifest));
mb.indices(clusterState.metadata().indices());
return ClusterState.builder(clusterState).metadata(mb).build();
mb.indices(state.metadata().indices());
clusterState = ClusterState.builder(state).metadata(mb).build();
}
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateFullDownloadSucceeded();
remoteStateStats.stateFullDownloadTook(durationMillis);

return clusterState;
}

public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId)
throws IOException {
assert manifest.getDiffManifest() != null : "Diff manifest null which is required for downloading cluster state";
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
ClusterStateDiffManifest diff = manifest.getDiffManifest();
List<UploadedIndexMetadata> updatedIndices = diff.getIndicesUpdated().stream().map(idx -> {
Optional<UploadedIndexMetadata> uploadedIndexMetadataOptional = manifest.getIndices()
@@ -1441,11 +1449,17 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C
indexRoutingTables.remove(indexName);
}

return clusterStateBuilder.stateUUID(manifest.getStateUUID())
ClusterState clusterState = clusterStateBuilder.stateUUID(manifest.getStateUUID())
.version(manifest.getStateVersion())
.metadata(metadataBuilder)
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables))
.build();

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
remoteStateStats.stateDiffDownloadSucceeded();
remoteStateStats.stateDiffDownloadTook(durationMillis);

return clusterState;
}

/**
@@ -1641,10 +1655,30 @@ void setRemoteClusterStateAttributesManager(RemoteClusterStateAttributesManager
}

public void writeMetadataFailed() {
getStats().stateFailed();
remoteStateStats.stateUploadFailed();
}

public RemotePersistenceStats getStats() {
public RemotePersistenceStats getRemoteStateStats() {
return remoteStateStats;
}

public PersistedStateStats getUploadStats() {
return remoteStateStats.getUploadStats();
}

public PersistedStateStats getFullDownloadStats() {
return remoteStateStats.getRemoteFullDownloadStats();
}

public PersistedStateStats getDiffDownloadStats() {
return remoteStateStats.getRemoteDiffDownloadStats();
}

public void fullDownloadFailed() {
remoteStateStats.stateFullDownloadFailed();
}

public void diffDownloadFailed() {
remoteStateStats.stateDiffDownloadFailed();
}
}
Original file line number Diff line number Diff line change
@@ -10,51 +10,96 @@

import org.opensearch.cluster.coordination.PersistedStateStats;

import java.util.concurrent.atomic.AtomicLong;

/**
* Remote state related extended stats.
*
* @opensearch.internal
*/
public class RemotePersistenceStats extends PersistedStateStats {
static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count";
static final String INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "index_routing_files_cleanup_attempt_failed_count";
static final String INDICES_ROUTING_DIFF_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "indices_routing_diff_files_cleanup_attempt_failed_count";
static final String REMOTE_UPLOAD = "remote_upload";
private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0);
public class RemotePersistenceStats {

RemoteUploadStats remoteUploadStats;
PersistedStateStats remoteDiffDownloadStats;
PersistedStateStats remoteFullDownloadStats;

private AtomicLong indexRoutingFilesCleanupAttemptFailedCount = new AtomicLong(0);
private AtomicLong indicesRoutingDiffFilesCleanupAttemptFailedCount = new AtomicLong(0);
final String FULL_DOWNLOAD_STATS = "remote_full_download";
final String DIFF_DOWNLOAD_STATS = "remote_diff_download";

public RemotePersistenceStats() {
super(REMOTE_UPLOAD);
addToExtendedFields(CLEANUP_ATTEMPT_FAILED_COUNT, cleanupAttemptFailedCount);
addToExtendedFields(INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT, indexRoutingFilesCleanupAttemptFailedCount);
addToExtendedFields(INDICES_ROUTING_DIFF_FILES_CLEANUP_ATTEMPT_FAILED_COUNT, indicesRoutingDiffFilesCleanupAttemptFailedCount);
remoteUploadStats = new RemoteUploadStats();
remoteDiffDownloadStats = new PersistedStateStats(DIFF_DOWNLOAD_STATS);
remoteFullDownloadStats = new PersistedStateStats(FULL_DOWNLOAD_STATS);
}

public void cleanUpAttemptFailed() {
cleanupAttemptFailedCount.incrementAndGet();
remoteUploadStats.cleanUpAttemptFailed();
}

public long getCleanupAttemptFailedCount() {
return cleanupAttemptFailedCount.get();
return remoteUploadStats.getCleanupAttemptFailedCount();
}

public void indexRoutingFilesCleanupAttemptFailed() {
indexRoutingFilesCleanupAttemptFailedCount.incrementAndGet();
remoteUploadStats.indexRoutingFilesCleanupAttemptFailed();
}

public long getIndexRoutingFilesCleanupAttemptFailedCount() {
return indexRoutingFilesCleanupAttemptFailedCount.get();
return remoteUploadStats.getIndexRoutingFilesCleanupAttemptFailedCount();
}

public void indicesRoutingDiffFileCleanupAttemptFailed() {
indicesRoutingDiffFilesCleanupAttemptFailedCount.incrementAndGet();
remoteUploadStats.indicesRoutingDiffFileCleanupAttemptFailed();
}

public long getIndicesRoutingDiffFileCleanupAttemptFailedCount() {
return indicesRoutingDiffFilesCleanupAttemptFailedCount.get();
return remoteUploadStats.getIndicesRoutingDiffFileCleanupAttemptFailedCount();
}

public void stateUploadSucceeded() {
remoteUploadStats.stateSucceeded();
}

public void stateUploadTook(long durationMillis) {
remoteUploadStats.stateTook(durationMillis);
}

public void stateUploadFailed() {
remoteUploadStats.stateFailed();
}

public void stateFullDownloadSucceeded() {
remoteFullDownloadStats.stateSucceeded();
}

public void stateDiffDownloadSucceeded() {
remoteDiffDownloadStats.stateSucceeded();
}

public void stateFullDownloadTook(long durationMillis) {
remoteFullDownloadStats.stateTook(durationMillis);
}

public void stateDiffDownloadTook(long durationMillis) {
remoteDiffDownloadStats.stateTook(durationMillis);
}

public void stateFullDownloadFailed() {
remoteFullDownloadStats.stateFailed();
}

public void stateDiffDownloadFailed() {
remoteDiffDownloadStats.stateFailed();
}

public PersistedStateStats getUploadStats() {
return remoteUploadStats;
}

public PersistedStateStats getRemoteDiffDownloadStats() {
return remoteDiffDownloadStats;
}

public PersistedStateStats getRemoteFullDownloadStats() {
return remoteFullDownloadStats;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.cluster.coordination.PersistedStateStats;

import java.util.concurrent.atomic.AtomicLong;

/**
* Upload stats for remote state
*
* @opensearch.internal
*/
public class RemoteUploadStats extends PersistedStateStats {
static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count";
static final String INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "index_routing_files_cleanup_attempt_failed_count";
static final String INDICES_ROUTING_DIFF_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "indices_routing_diff_files_cleanup_attempt_failed_count";
static final String REMOTE_UPLOAD = "remote_upload";
private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0);
private AtomicLong indexRoutingFilesCleanupAttemptFailedCount = new AtomicLong(0);
private AtomicLong indicesRoutingDiffFilesCleanupAttemptFailedCount = new AtomicLong(0);

public RemoteUploadStats() {
super(REMOTE_UPLOAD);
addToExtendedFields(CLEANUP_ATTEMPT_FAILED_COUNT, cleanupAttemptFailedCount);
addToExtendedFields(INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT, indexRoutingFilesCleanupAttemptFailedCount);
addToExtendedFields(INDICES_ROUTING_DIFF_FILES_CLEANUP_ATTEMPT_FAILED_COUNT, indicesRoutingDiffFilesCleanupAttemptFailedCount);
}

public void cleanUpAttemptFailed() {
cleanupAttemptFailedCount.incrementAndGet();
}

public long getCleanupAttemptFailedCount() {
return cleanupAttemptFailedCount.get();
}

public void indexRoutingFilesCleanupAttemptFailed() {
indexRoutingFilesCleanupAttemptFailedCount.incrementAndGet();
}

public long getIndexRoutingFilesCleanupAttemptFailedCount() {
return indexRoutingFilesCleanupAttemptFailedCount.get();
}

public void indicesRoutingDiffFileCleanupAttemptFailed() {
indicesRoutingDiffFilesCleanupAttemptFailedCount.incrementAndGet();
}

public long getIndicesRoutingDiffFileCleanupAttemptFailedCount() {
return indicesRoutingDiffFilesCleanupAttemptFailedCount.get();
}
}
Original file line number Diff line number Diff line change
@@ -800,7 +800,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) throws IOExcep
: null;
ClusterStateStats stateStats = new ClusterStateStats();
RemotePersistenceStats remoteStateStats = new RemotePersistenceStats();
stateStats.setPersistenceStats(Arrays.asList(remoteStateStats));
stateStats.setPersistenceStats(Arrays.asList(remoteStateStats.getUploadStats()));
DiscoveryStats discoveryStats = frequently()
? new DiscoveryStats(
randomBoolean() ? new PendingClusterStateStats(randomInt(), randomInt(), randomInt()) : null,
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.gateway.GatewayMetaState.RemotePersistedState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterStateDiffManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
@@ -74,8 +75,12 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class PublicationTransportHandlerTests extends OpenSearchTestCase {
@@ -175,7 +180,9 @@ public void testHandleIncomingRemotePublishRequestWhenNoCurrentPublishRequest()
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
);
assertThat(e.getMessage(), containsString("publication to self failed"));
Mockito.verifyNoInteractions(remoteClusterStateService);
verify(remoteClusterStateService, times(0)).fullDownloadFailed();
verify(remoteClusterStateService, times(1)).diffDownloadFailed();
verifyNoMoreInteractions(remoteClusterStateService);
}

public void testHandleIncomingRemotePublishRequestWhenTermMismatch() {
@@ -200,7 +207,9 @@ public void testHandleIncomingRemotePublishRequestWhenTermMismatch() {
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
);
assertThat(e.getMessage(), containsString("publication to self failed"));
Mockito.verifyNoInteractions(remoteClusterStateService);
verify(remoteClusterStateService, times(0)).fullDownloadFailed();
verify(remoteClusterStateService, times(1)).diffDownloadFailed();
verifyNoMoreInteractions(remoteClusterStateService);
}

public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() {
@@ -225,7 +234,9 @@ public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() {
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
);
assertThat(e.getMessage(), containsString("publication to self failed"));
Mockito.verifyNoInteractions(remoteClusterStateService);
verify(remoteClusterStateService, times(1)).diffDownloadFailed();
verify(remoteClusterStateService, times(0)).fullDownloadFailed();
verifyNoMoreInteractions(remoteClusterStateService);
}

public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOException {
@@ -250,6 +261,82 @@ public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOExcept
Mockito.verifyNoInteractions(remoteClusterStateService);
}

public void testDownloadRemotePersistedFullStateFailedStats() throws IOException {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
PersistedStateStats remoteFullDownloadStats = new PersistedStateStats("dummy_full_stats");
PersistedStateStats remoteDiffDownloadStats = new PersistedStateStats("dummy_diff_stats");
when(remoteClusterStateService.getFullDownloadStats()).thenReturn(remoteFullDownloadStats);
when(remoteClusterStateService.getDiffDownloadStats()).thenReturn(remoteDiffDownloadStats);

doAnswer((i) -> {
remoteFullDownloadStats.stateFailed();
return null;
}).when(remoteClusterStateService).fullDownloadFailed();

doAnswer((i) -> {
remoteDiffDownloadStats.stateFailed();
return null;
}).when(remoteClusterStateService).diffDownloadFailed();

PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
secondNode,
TERM,
VERSION,
CLUSTER_NAME,
CLUSTER_UUID,
MANIFEST_FILE
);
ClusterState clusterState = buildClusterState(TERM, VERSION);
PublishRequest publishRequest = new PublishRequest(clusterState);
handler.setCurrentPublishRequestToSelf(publishRequest);

assertThrows(IllegalStateException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest));
assertEquals(1, remoteClusterStateService.getDiffDownloadStats().getFailedCount());
assertEquals(0, remoteClusterStateService.getFullDownloadStats().getFailedCount());
}

public void testDownloadRemotePersistedDiffStateFailedStats() throws IOException {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
PersistedStateStats remoteDiffDownloadStats = new PersistedStateStats("dummy_stats");
when(remoteClusterStateService.getDiffDownloadStats()).thenReturn(remoteDiffDownloadStats);

ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest.Builder().diffManifest(
new ClusterStateDiffManifest.Builder().fromStateUUID("state-uuid").build()
).build();
when(remoteClusterStateService.getClusterMetadataManifestByFileName(any(), any())).thenReturn(metadataManifest);

doAnswer((i) -> {
remoteDiffDownloadStats.stateFailed();
return null;
}).when(remoteClusterStateService).diffDownloadFailed();

PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.nodes()).thenReturn(mock(DiscoveryNodes.class));
handler.setLastSeenClusterState(clusterState);
when(clusterState.stateUUID()).thenReturn("state-uuid");
RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
secondNode,
TERM,
VERSION,
CLUSTER_NAME,
CLUSTER_UUID,
MANIFEST_FILE
);
clusterState = buildClusterState(TERM, VERSION);
PublishRequest publishRequest = new PublishRequest(clusterState);
handler.setCurrentPublishRequestToSelf(publishRequest);

assertThrows(NullPointerException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest));
assertEquals(1, remoteClusterStateService.getDiffDownloadStats().getFailedCount());

}

public void testHandleIncomingRemotePublishRequestWhenManifestNotFound() throws IOException {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);

Original file line number Diff line number Diff line change
@@ -68,7 +68,7 @@
import org.opensearch.gateway.PersistedClusterStateService.Writer;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemotePersistenceStats;
import org.opensearch.gateway.remote.RemoteUploadStats;
import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult;
@@ -112,7 +112,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -899,14 +899,17 @@ public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOExcept
}

public void testRemotePersistedStateFailureStats() throws IOException {
RemotePersistenceStats remoteStateStats = new RemotePersistenceStats();
RemoteUploadStats remoteStateStats = new RemoteUploadStats();
final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
final String previousClusterUUID = "prev-cluster-uuid";
Mockito.doThrow(IOException.class)
.when(remoteClusterStateService)
.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION));
when(remoteClusterStateService.getStats()).thenReturn(remoteStateStats);
doCallRealMethod().when(remoteClusterStateService).writeMetadataFailed();
when(remoteClusterStateService.getUploadStats()).thenReturn(remoteStateStats);
doAnswer((i) -> {
remoteStateStats.stateFailed();
return null;
}).when(remoteClusterStateService).writeMetadataFailed();
CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID);

final long clusterTerm = randomNonNegativeLong();
@@ -916,8 +919,8 @@ public void testRemotePersistedStateFailureStats() throws IOException {
);

assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(clusterState));
assertEquals(1, remoteClusterStateService.getStats().getFailedCount());
assertEquals(0, remoteClusterStateService.getStats().getSuccessCount());
assertEquals(1, remoteClusterStateService.getUploadStats().getFailedCount());
assertEquals(0, remoteClusterStateService.getUploadStats().getSuccessCount());
}

public void testGatewayForRemoteState() throws IOException {
Original file line number Diff line number Diff line change
@@ -144,7 +144,7 @@ public void setup() {
remoteManifestManager = mock(RemoteManifestManager.class);
remoteClusterStateService = mock(RemoteClusterStateService.class);
when(remoteClusterStateService.getRemoteManifestManager()).thenReturn(remoteManifestManager);
when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats());
when(remoteClusterStateService.getRemoteStateStats()).thenReturn(new RemotePersistenceStats());
when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool);
when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore);
when(remoteClusterStateService.getBlobStoreRepository()).thenReturn(blobStoreRepository);
@@ -503,7 +503,7 @@ public void testRemoteStateCleanupFailureStats() throws IOException {
assertBusy(() -> {
// wait for stats to get updated
assertTrue(remoteClusterStateCleanupManager.getStats() != null);
assertEquals(0, remoteClusterStateCleanupManager.getStats().getSuccessCount());
assertEquals(0, remoteClusterStateCleanupManager.getStats().getUploadStats().getSuccessCount());
assertEquals(1, remoteClusterStateCleanupManager.getStats().getCleanupAttemptFailedCount());
});
} catch (Exception e) {
Original file line number Diff line number Diff line change
@@ -575,7 +575,7 @@ public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOEx
RemoteStateTransferException.class,
() -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10), MANIFEST_CURRENT_CODEC_VERSION)
);
assertEquals(0, remoteClusterStateService.getStats().getSuccessCount());
assertEquals(0, remoteClusterStateService.getUploadStats().getSuccessCount());
}

public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException {
@@ -587,7 +587,7 @@ public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOExc
null
);
Assert.assertThat(manifestDetails, nullValue());
assertEquals(0, remoteClusterStateService.getStats().getSuccessCount());
assertEquals(0, remoteClusterStateService.getUploadStats().getSuccessCount());
}

public void testFailWriteIncrementalMetadataWhenTermChanged() {
@@ -861,6 +861,10 @@ public void testGetClusterStateForManifest_IncludeEphemeral() throws IOException
when(mockedResult.getComponent()).thenReturn(COORDINATION_METADATA);
RemoteClusterStateService mockService = spy(remoteClusterStateService);
mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true);

assertNotNull(remoteClusterStateService.getFullDownloadStats());
assertEquals(1, remoteClusterStateService.getFullDownloadStats().getSuccessCount());
assertEquals(0, remoteClusterStateService.getFullDownloadStats().getFailedCount());
verify(mockService, times(1)).readClusterStateInParallel(
any(),
eq(manifest),
@@ -2590,7 +2594,7 @@ public void testGetValidPreviousClusterUUIDWhenLastUUIDUncommitted() throws IOEx
assertThat(previousClusterUUID, equalTo("cluster-uuid2"));
}

public void testRemoteStateStats() throws IOException {
public void testRemoteStateUploadStats() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
mockBlobStoreObjects();
remoteClusterStateService.start();
@@ -2600,10 +2604,10 @@ public void testRemoteStateStats() throws IOException {
MANIFEST_CURRENT_CODEC_VERSION
).getClusterMetadataManifest();

assertTrue(remoteClusterStateService.getStats() != null);
assertEquals(1, remoteClusterStateService.getStats().getSuccessCount());
assertEquals(0, remoteClusterStateService.getStats().getCleanupAttemptFailedCount());
assertEquals(0, remoteClusterStateService.getStats().getFailedCount());
assertTrue(remoteClusterStateService.getUploadStats() != null);
assertEquals(1, remoteClusterStateService.getUploadStats().getSuccessCount());
assertEquals(0, remoteClusterStateService.getRemoteStateStats().getCleanupAttemptFailedCount());
assertEquals(0, remoteClusterStateService.getUploadStats().getFailedCount());
}

public void testRemoteRoutingTableNotInitializedWhenDisabled() {

0 comments on commit 38d0a82

Please sign in to comment.