Skip to content

Commit

Permalink
Clear templates before Adding; Use NamedWriteableAwareStreamInput for…
Browse files Browse the repository at this point in the history
… RemoteCustomMetadata; Correct the check for deciding upload of HashesOfConsistentSettings (opensearch-project#14513)

* Clear templates before Adding; Use NamedWriteableAwareStreamInput for RemoteCustomMetadata
* Correct the check for deciding upload of hashes of consistent settings

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha authored and wangdongyu.danny committed Aug 22, 2024
1 parent 222f203 commit b296cf0
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,7 @@ public Builder templates(Map<String, IndexTemplateMetadata> templates) {
}

public Builder templates(TemplatesMetadata templatesMetadata) {
this.templates.clear();
this.templates.putAll(templatesMetadata.getTemplates());
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
&& clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = isPublicationEnabled
|| Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
&& Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;

uploadedMetadataResults = writeMetadataInParallel(
clusterState,
Expand Down Expand Up @@ -476,7 +476,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
return manifestDetails;
}

private UploadedMetadataResults writeMetadataInParallel(
// package private for testing
UploadedMetadataResults writeMetadataInParallel(
ClusterState clusterState,
List<IndexMetadata> indexToUpload,
Map<String, IndexMetadata> prevIndexMetadataByName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.compress.Compressor;
Expand Down Expand Up @@ -122,6 +123,8 @@ public UploadedMetadata getUploadedMetadata() {

public static Custom readFrom(StreamInput streamInput, NamedWriteableRegistry namedWriteableRegistry, String customType)
throws IOException {
return namedWriteableRegistry.getReader(Custom.class, customType).read(streamInput);
try (StreamInput in = new NamedWriteableAwareStreamInput(streamInput, namedWriteableRegistry)) {
return namedWriteableRegistry.getReader(Custom.class, customType).read(in);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,33 @@ public void testIsSegmentReplicationDisabled() {
assertFalse(metadata.isSegmentReplicationEnabled(indexName));
}

public void testTemplatesMetadata() {
TemplatesMetadata templatesMetadata1 = TemplatesMetadata.builder()
.put(
IndexTemplateMetadata.builder("template_1")
.patterns(Arrays.asList("bar-*", "foo-*"))
.settings(Settings.builder().put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build())
.build()
)
.build();
Metadata metadata1 = Metadata.builder().templates(templatesMetadata1).build();
assertThat(metadata1.templates(), is(templatesMetadata1.getTemplates()));

TemplatesMetadata templatesMetadata2 = TemplatesMetadata.builder()
.put(
IndexTemplateMetadata.builder("template_2")
.patterns(Arrays.asList("bar-*", "foo-*"))
.settings(Settings.builder().put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build())
.build()
)
.build();

Metadata metadata2 = Metadata.builder(metadata1).templates(templatesMetadata2).build();

assertThat(metadata2.templates(), is(templatesMetadata2.getTemplates()));

}

public static Metadata randomMetadata() {
Metadata.Builder md = Metadata.builder()
.put(buildIndexMetadata("index", "alias", randomBoolean() ? null : randomBoolean()).build(), randomBoolean())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService;
Expand Down Expand Up @@ -92,6 +93,7 @@

import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

import static java.util.stream.Collectors.toList;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
Expand All @@ -111,13 +113,15 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -518,11 +522,13 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build();

remoteClusterStateService.start();
final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(
final RemoteClusterStateService rcssSpy = Mockito.spy(remoteClusterStateService);
final RemoteClusterStateManifestInfo manifestInfo = rcssSpy.writeIncrementalMetadata(
previousClusterState,
clusterState,
previousManifest
).getClusterMetadataManifest();
);
final ClusterMetadataManifest manifest = manifestInfo.getClusterMetadataManifest();
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename__2");
final List<UploadedIndexMetadata> indices = List.of(uploadedIndexMetadata);

Expand All @@ -535,6 +541,24 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
.previousClusterUUID("prev-cluster-uuid")
.build();

Mockito.verify(rcssSpy)
.writeMetadataInParallel(
eq(clusterState),
eq(new ArrayList<IndexMetadata>(clusterState.metadata().indices().values())),
eq(Collections.singletonMap(indices.get(0).getIndexName(), null)),
eq(clusterState.metadata().customs()),
eq(true),
eq(true),
eq(true),
eq(false),
eq(false),
eq(false),
eq(Collections.emptyMap()),
eq(false),
eq(Collections.emptyList())
);

assertThat(manifestInfo.getManifestFileName(), notNullValue());
assertThat(manifest.getIndices().size(), is(1));
assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName()));
assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID()));
Expand All @@ -543,6 +567,95 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion()));
assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID()));
assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
assertThat(manifest.getHashesOfConsistentSettings(), nullValue());
assertThat(manifest.getDiscoveryNodesMetadata(), nullValue());
assertThat(manifest.getClusterBlocksMetadata(), nullValue());
assertThat(manifest.getClusterStateCustomMap(), anEmptyMap());
assertThat(manifest.getTransientSettingsMetadata(), nullValue());
assertThat(manifest.getTemplatesMetadata(), notNullValue());
assertThat(manifest.getCoordinationMetadata(), notNullValue());
assertThat(manifest.getCustomMetadataMap().size(), is(2));
assertThat(manifest.getIndicesRouting().size(), is(0));
}

public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws IOException {
publicationEnabled = true;
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, publicationEnabled).build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
remoteClusterStateService = new RemoteClusterStateService(
"test-node-id",
repositoriesServiceSupplier,
settings,
clusterService,
() -> 0L,
threadPool,
List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)),
writableRegistry()
);
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
mockBlobStoreObjects();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().coordinationMetadata(coordinationMetadata))
.build();

final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build();

remoteClusterStateService.start();
final RemoteClusterStateService rcssSpy = Mockito.spy(remoteClusterStateService);
final RemoteClusterStateManifestInfo manifestInfo = rcssSpy.writeIncrementalMetadata(
previousClusterState,
clusterState,
previousManifest
);
final ClusterMetadataManifest manifest = manifestInfo.getClusterMetadataManifest();
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename__2");
final List<UploadedIndexMetadata> indices = List.of(uploadedIndexMetadata);

final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
.indices(indices)
.clusterTerm(1L)
.stateVersion(1L)
.stateUUID("state-uuid")
.clusterUUID("cluster-uuid")
.previousClusterUUID("prev-cluster-uuid")
.build();

Mockito.verify(rcssSpy)
.writeMetadataInParallel(
eq(clusterState),
eq(new ArrayList<IndexMetadata>(clusterState.metadata().indices().values())),
eq(Collections.singletonMap(indices.get(0).getIndexName(), null)),
eq(clusterState.metadata().customs()),
eq(true),
eq(true),
eq(true),
eq(true),
eq(false),
eq(false),
eq(Collections.emptyMap()),
eq(true),
Mockito.anyList()
);

assertThat(manifestInfo.getManifestFileName(), notNullValue());
assertThat(manifest.getIndices().size(), is(1));
assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName()));
assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID()));
assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue());
assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm()));
assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion()));
assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID()));
assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
assertThat(manifest.getHashesOfConsistentSettings(), notNullValue());
assertThat(manifest.getDiscoveryNodesMetadata(), notNullValue());
assertThat(manifest.getClusterBlocksMetadata(), nullValue());
assertThat(manifest.getClusterStateCustomMap(), anEmptyMap());
assertThat(manifest.getTransientSettingsMetadata(), nullValue());
assertThat(manifest.getTemplatesMetadata(), notNullValue());
assertThat(manifest.getCoordinationMetadata(), notNullValue());
assertThat(manifest.getCustomMetadataMap().size(), is(2));
assertThat(manifest.getIndicesRouting().size(), is(1));
}

/*
Expand Down Expand Up @@ -2012,7 +2125,9 @@ static ClusterState.Builder generateClusterStateWithOneIndex() {
.build();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
final Settings settings = Settings.builder().put("mock-settings", true).build();
final TemplatesMetadata templatesMetadata = TemplatesMetadata.EMPTY_METADATA;
final TemplatesMetadata templatesMetadata = TemplatesMetadata.builder()
.put(IndexTemplateMetadata.builder("template1").settings(idxSettings).patterns(List.of("test*")).build())
.build();
final CustomMetadata1 customMetadata1 = new CustomMetadata1("custom-metadata-1");
return ClusterState.builder(ClusterName.DEFAULT)
.version(1L)
Expand All @@ -2025,14 +2140,16 @@ static ClusterState.Builder generateClusterStateWithOneIndex() {
.coordinationMetadata(coordinationMetadata)
.persistentSettings(settings)
.templates(templatesMetadata)
.hashesOfConsistentSettings(Map.of("key1", "value1", "key2", "value2"))
.putCustom(customMetadata1.getWriteableName(), customMetadata1)
.build()
)
.routingTable(RoutingTable.builder().addAsNew(indexMetadata).version(1L).build());
}

static DiscoveryNodes nodesWithLocalNodeClusterManager() {
return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build();
final DiscoveryNode localNode = new DiscoveryNode("cluster-manager-id", buildNewFakeTransportAddress(), Version.CURRENT);
return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").add(localNode).build();
}

private static class CustomMetadata1 extends TestCustomMetadata {
Expand Down
Loading

0 comments on commit b296cf0

Please sign in to comment.