diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index d3e592d0d4e5a..792f0058562f2 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -233,7 +233,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } // package private for testing - PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, RuntimeException { + PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException { try { if (transportService.getLocalNode().equals(request.getSourceNode())) { return acceptRemoteStateOnLocalNode(request); @@ -302,8 +302,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest } } catch (Exception e) { remoteClusterStateService.readMetadataFailed(); - if (e instanceof IOException) throw new IOException("IOException in reading remote cluster state", e); - throw new RuntimeException("Runtime exception in reading remote cluster state", e); + throw e; } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java index 8c1505ab5ed89..a43411b37d369 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteDownloadStats.java @@ -33,4 +33,11 @@ public void fullDownloadState() { fullDownloadCount.incrementAndGet(); } + public long getDiffDownloadCount() { + return diffDownloadCount.get(); + } + + public long getFullDownloadCount() { + return fullDownloadCount.get(); + } } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java index 08e3f47100d8c..c252b82c35875 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -44,7 +44,9 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.ClusterStateDiffManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemoteDownloadStats; import org.opensearch.node.Node; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; @@ -62,8 +64,12 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class PublicationTransportHandlerTests extends OpenSearchTestCase { @@ -160,7 +166,8 @@ public void testHandleIncomingRemotePublishRequestWhenNoCurrentPublishRequest() () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) ); assertThat(e.getMessage(), containsString("publication to self failed")); - Mockito.verifyNoInteractions(remoteClusterStateService); + verify(remoteClusterStateService, times(1)).readMetadataFailed(); + verifyNoMoreInteractions(remoteClusterStateService); } public void testHandleIncomingRemotePublishRequestWhenTermMismatch() { @@ -185,7 +192,8 @@ public void testHandleIncomingRemotePublishRequestWhenTermMismatch() { () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) ); assertThat(e.getMessage(), containsString("publication to self failed")); - Mockito.verifyNoInteractions(remoteClusterStateService); + verify(remoteClusterStateService, times(1)).readMetadataFailed(); + verifyNoMoreInteractions(remoteClusterStateService); } public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() { @@ -210,7 +218,8 @@ public void testHandleIncomingRemotePublishRequestWhenVersionMismatch() { () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest) ); assertThat(e.getMessage(), containsString("publication to self failed")); - Mockito.verifyNoInteractions(remoteClusterStateService); + verify(remoteClusterStateService, times(1)).readMetadataFailed(); + verifyNoMoreInteractions(remoteClusterStateService); } public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOException { @@ -235,6 +244,119 @@ public void testHandleIncomingRemotePublishRequestForLocalNode() throws IOExcept Mockito.verifyNoInteractions(remoteClusterStateService); } + public void testDownloadRemotePersistedFailedStats() throws IOException { + RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats(); + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + when(remoteClusterStateService.getDownloadStats()).thenReturn(remoteDownloadStats); + + doAnswer((i) -> { + remoteDownloadStats.stateFailed(); + return null; + }).when(remoteClusterStateService).readMetadataFailed(); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + secondNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + ClusterState clusterState = buildClusterState(TERM, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + + assertThrows(IllegalStateException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)); + assertEquals(1, remoteClusterStateService.getDownloadStats().getFailedCount()); + } + + public void testDownloadRemotePersistedDiffStats() throws IOException { + RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats(); + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + when(remoteClusterStateService.getDownloadStats()).thenReturn(remoteDownloadStats); + ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest.Builder().diffManifest( + new ClusterStateDiffManifest.Builder().fromStateUUID("state-uuid").build() + ).build(); + when(remoteClusterStateService.getClusterMetadataManifestByFileName(any(), any())).thenReturn(metadataManifest); + + doAnswer((i) -> { + remoteDownloadStats.diffDownloadState(); + return null; + }).when(remoteClusterStateService).diffDownloadState(); + + doAnswer((i) -> { + remoteDownloadStats.fullDownloadState(); + return null; + }).when(remoteClusterStateService).fullDownloadState(); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + ClusterState clusterState = mock(ClusterState.class); + handler.setLastSeenClusterState(clusterState); + when(clusterState.stateUUID()).thenReturn("state-uuid"); + + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + secondNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + clusterState = buildClusterState(TERM, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + assertThrows(NullPointerException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)); + assertEquals(1, remoteDownloadStats.getDiffDownloadCount()); + assertEquals(0, remoteDownloadStats.getFullDownloadCount()); + } + + public void testDownloadRemotePersistedFullStats() throws IOException { + RemoteDownloadStats remoteDownloadStats = new RemoteDownloadStats(); + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + when(remoteClusterStateService.getDownloadStats()).thenReturn(remoteDownloadStats); + ClusterMetadataManifest metadataManifest = new ClusterMetadataManifest.Builder().diffManifest( + new ClusterStateDiffManifest.Builder().fromStateUUID("state-uuid2").build() + ).build(); + when(remoteClusterStateService.getClusterMetadataManifestByFileName(any(), any())).thenReturn(metadataManifest); + + doAnswer((i) -> { + remoteDownloadStats.diffDownloadState(); + return null; + }).when(remoteClusterStateService).diffDownloadState(); + + doAnswer((i) -> { + remoteDownloadStats.fullDownloadState(); + return null; + }).when(remoteClusterStateService).fullDownloadState(); + + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + ClusterState clusterState = mock(ClusterState.class); + handler.setLastSeenClusterState(clusterState); + when(clusterState.stateUUID()).thenReturn("state-uuid"); + + RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + secondNode, + TERM, + VERSION, + CLUSTER_NAME, + CLUSTER_UUID, + MANIFEST_FILE + ); + clusterState = buildClusterState(TERM, VERSION); + PublishRequest publishRequest = new PublishRequest(clusterState); + handler.setCurrentPublishRequestToSelf(publishRequest); + assertThrows(NullPointerException.class, () -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)); + assertEquals(0, remoteDownloadStats.getDiffDownloadCount()); + assertEquals(1, remoteDownloadStats.getFullDownloadCount()); + } + public void testHandleIncomingRemotePublishRequestWhenManifestNotFound() throws IOException { RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 191d40862fe6d..71cb5ab0687b1 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -109,7 +109,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -837,7 +837,10 @@ public void testRemotePersistedStateFailureStats() throws IOException { final String previousClusterUUID = "prev-cluster-uuid"; Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any()); when(remoteClusterStateService.getUploadStats()).thenReturn(remoteStateStats); - doCallRealMethod().when(remoteClusterStateService).writeMetadataFailed(); + doAnswer((i) -> { + remoteStateStats.stateFailed(); + return null; + }).when(remoteClusterStateService).writeMetadataFailed(); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); final long clusterTerm = randomNonNegativeLong(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 4cbd0e2fdcf00..40919ca718a28 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -851,6 +851,10 @@ public void testGetClusterStateForManifest_IncludeEphemeral() throws IOException when(mockedResult.getComponent()).thenReturn(COORDINATION_METADATA); RemoteClusterStateService mockService = spy(remoteClusterStateService); mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true); + + assertNotNull(remoteClusterStateService.getDownloadStats()); + assertEquals(1, remoteClusterStateService.getDownloadStats().getSuccessCount()); + assertEquals(0, remoteClusterStateService.getDownloadStats().getFailedCount()); verify(mockService, times(1)).readClusterStateInParallel( any(), eq(manifest), @@ -2568,7 +2572,7 @@ public void testGetValidPreviousClusterUUIDWhenLastUUIDUncommitted() throws IOEx assertThat(previousClusterUUID, equalTo("cluster-uuid2")); } - public void testRemoteStateStats() throws IOException { + public void testRemoteStateUploadStats() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start();