Skip to content

Commit

Permalink
Add an assertion number of results to be equal to upload tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 committed Jul 18, 2024
1 parent c6d033c commit ddd2fa8
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity en
* Returns an ActionListener for handling the write operation for the specified component, remote object, and latched action listener.
*
* @param component the component for which the write operation is performed
* @param remoteObject the remote object to be written
* @param remoteEntity the remote object to be written
* @param listener the listener to be notified when the write operation completes
* @return an ActionListener for handling the write operation
*/
protected abstract ActionListener<Void> getWriteActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
);

Expand All @@ -58,13 +58,13 @@ protected abstract ActionListener<Void> getWriteActionListener(
* remote object, and latched action listener.
*
* @param component the component for which the read operation is performed
* @param remoteObject the remote object to be read
* @param remoteEntity the remote object to be read
* @param listener the listener to be notified when the read operation completes
* @return an ActionListener for handling the read operation
*/
protected abstract ActionListener<Object> getReadActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,24 @@ public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableE
@Override
protected ActionListener<Void> getWriteActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
return ActionListener.wrap(
resp -> listener.onResponse(remoteObject.getUploadedMetadata()),
ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex))
resp -> listener.onResponse(remoteEntity.getUploadedMetadata()),
ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex))
);
}

@Override
protected ActionListener<Object> getReadActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
) {
return ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex))
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,16 @@ UploadedMetadataResults writeMetadataInParallel(
exceptionList.forEach(exception::addSuppressed);
throw exception;
}
if (results.size() != uploadTasks.size()) {
throw new RemoteStateTransferException(
String.format(
Locale.ROOT,
"Some metadata components were not uploaded successfully. Objects to be uploaded: %s, uploaded objects: %s",
String.join(", ", uploadTasks),
String.join(", ", results.keySet())
)
);
}
UploadedMetadataResults response = new UploadedMetadataResults();
results.forEach((name, uploadedMetadata) -> {
if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,24 +159,24 @@ public class RemoteGlobalMetadataManager extends AbstractRemoteWritableEntityMan
@Override
protected ActionListener<Void> getWriteActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
return ActionListener.wrap(
resp -> listener.onResponse(remoteObject.getUploadedMetadata()),
ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex))
resp -> listener.onResponse(remoteEntity.getUploadedMetadata()),
ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex))
);
}

@Override
protected ActionListener<Object> getReadActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
) {
return ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult(response, remoteObject.getType(), component)),
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex))
response -> listener.onResponse(new RemoteReadResult(response, remoteEntity.getType(), component)),
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,24 +106,24 @@ private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeo
@Override
protected ActionListener<Void> getWriteActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
return ActionListener.wrap(
resp -> listener.onResponse(remoteObject.getUploadedMetadata()),
ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex))
resp -> listener.onResponse(remoteEntity.getUploadedMetadata()),
ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex))
);
}

@Override
protected ActionListener<Object> getReadActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
) {
return ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult(response, RemoteIndexMetadata.INDEX, component)),
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex))
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private static class ConcreteRemoteWritableEntityManager extends AbstractRemoteW
@Override
protected ActionListener<Void> getWriteActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
return null;
Expand All @@ -55,7 +55,7 @@ protected ActionListener<Void> getWriteActionListener(
@Override
protected ActionListener<Object> getReadActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
AbstractRemoteWritableBlobEntity remoteEntity,
ActionListener<RemoteReadResult> listener
) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.ParseField;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -80,6 +81,7 @@
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.rmi.Remote;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -121,6 +123,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getFormattedIndexFileName;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT;
import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS_FORMAT;
import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks;
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
Expand Down Expand Up @@ -590,6 +593,52 @@ public void testFailWriteIncrementalMetadataWhenTermChanged() {
);
}

public void testWriteMetadataInParallelIncompleteUpload() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
final RemoteClusterStateService rcssSpy = Mockito.spy(remoteClusterStateService);
rcssSpy.start();
RemoteIndexMetadataManager mockedIndexManager = mock(RemoteIndexMetadataManager.class);
RemoteGlobalMetadataManager mockedGlobalMetadataManager = mock(RemoteGlobalMetadataManager.class);
RemoteClusterStateAttributesManager mockedClusterStateAttributeManager = mock(RemoteClusterStateAttributesManager.class);
ClusterMetadataManifest.UploadedMetadata mockedUploadedMetadata = mock(ClusterMetadataManifest.UploadedMetadata.class);
rcssSpy.setRemoteIndexMetadataManager(mockedIndexManager);
rcssSpy.setRemoteGlobalMetadataManager(mockedGlobalMetadataManager);
rcssSpy.setRemoteClusterStateAttributesManager(mockedClusterStateAttributeManager);
ArgumentCaptor<LatchedActionListener> listenerArgumentCaptor = ArgumentCaptor.forClass(LatchedActionListener.class);

when(mockedGlobalMetadataManager.getGlobalMetadataUploadTimeout()).thenReturn(GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT);
when(mockedUploadedMetadata.getComponent()).thenReturn("test-component");
doAnswer(invocation -> {
listenerArgumentCaptor.getValue().onResponse(mockedUploadedMetadata);
return null;
}).when(mockedIndexManager).writeAsync(any(), any(), listenerArgumentCaptor.capture());
doAnswer(invocation -> {
listenerArgumentCaptor.getValue().onResponse(mockedUploadedMetadata);
return null;
}).when(mockedGlobalMetadataManager).writeAsync(anyString(), any(), listenerArgumentCaptor.capture());
doAnswer(invocation -> {
listenerArgumentCaptor.getValue().onResponse(mockedUploadedMetadata);
return null;
}).when(mockedClusterStateAttributeManager).writeAsync(any(), any(), listenerArgumentCaptor.capture());

RemoteStateTransferException exception = expectThrows(RemoteStateTransferException.class, () -> rcssSpy.writeMetadataInParallel(
clusterState,
new ArrayList<>(clusterState.getMetadata().indices().values()),
emptyMap(),
clusterState.getMetadata().customs(),
true,
true,
true,
true,
true,
true,
clusterState.getCustoms(),
true,
emptyList()
));
assertTrue(exception.getMessage().startsWith("Some metadata components were not uploaded successfully"));
}

public void testWriteIncrementalMetadataSuccess() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
mockBlobStoreObjects();
Expand Down

0 comments on commit ddd2fa8

Please sign in to comment.