Skip to content

Commit

Permalink
Merge branch '2.x' into backport-15362-to-2.x
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis authored Oct 21, 2024
2 parents b99a0ac + 41d582e commit b202427
Show file tree
Hide file tree
Showing 50 changed files with 2,137 additions and 271 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))
- Flat object field use IndexOrDocValuesQuery to optimize query ([#14383](https://github.com/opensearch-project/OpenSearch/issues/14383))
- Add method to return dynamic SecureTransportParameters from SecureTransportSettingsProvider interface ([#16387](https://github.com/opensearch-project/OpenSearch/pull/16387)
- Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858))
Expand Down Expand Up @@ -86,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix wrong default value when setting `index.number_of_routing_shards` to null on index creation ([#16331](https://github.com/opensearch-project/OpenSearch/pull/16331))
- [Workload Management] Make query groups persistent across process restarts [#16370](https://github.com/opensearch-project/OpenSearch/pull/16370)
- Fix typo super->sb in method toString() of RemoteStoreNodeAttribute ([#15362](https://github.com/opensearch-project/OpenSearch/pull/15362))
- Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -51,9 +50,9 @@ public void testCatShardsWithSuccessResponse() throws InterruptedException {
client().execute(CatShardsAction.INSTANCE, shardsRequest, new ActionListener<CatShardsResponse>() {
@Override
public void onResponse(CatShardsResponse catShardsResponse) {
ClusterStateResponse clusterStateResponse = catShardsResponse.getClusterStateResponse();
List<ShardRouting> shardRoutings = catShardsResponse.getResponseShards();
IndicesStatsResponse indicesStatsResponse = catShardsResponse.getIndicesStatsResponse();
for (ShardRouting shard : clusterStateResponse.getState().routingTable().allShards()) {
for (ShardRouting shard : shardRoutings) {
assertEquals("test", shard.getIndexName());
assertNotNull(indicesStatsResponse.asMap().get(shard));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ public void testContinuousIndexing() throws Exception {
int numDocs = randomIntBetween(200, 300);
totalDocs += numDocs;
try (BackgroundIndexer indexer = new BackgroundIndexer(index, MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) {
int numberOfSnapshots = 5;
int numberOfSnapshots = 2;
for (int i = 0; i < numberOfSnapshots; i++) {
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
long finalTotalDocs1 = totalDocs;
Expand Down Expand Up @@ -976,4 +976,112 @@ public void testContinuousIndexing() throws Exception {
});
}
}

public void testHashedPrefixTranslogMetadataCombination() throws Exception {
Settings settings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(RemoteStoreEnums.PathType.values()))
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), randomBoolean())
.build();

internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNode(settings);
String index = "test-index";
String snapshotRepo = "test-restore-snapshot-repo";
String baseSnapshotName = "snapshot_";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

createRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, true));

Client client = client();
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

createIndex(index, indexSettings);
ensureGreen(index);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(index)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(randomIntBetween(1, 5)));
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueSeconds(randomIntBetween(1, 5)));

long totalDocs = 0;
Map<String, Long> snapshots = new HashMap<>();
int numDocs = randomIntBetween(200, 300);
totalDocs += numDocs;
try (BackgroundIndexer indexer = new BackgroundIndexer(index, MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) {
int numberOfSnapshots = 2;
for (int i = 0; i < numberOfSnapshots; i++) {
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
long finalTotalDocs1 = totalDocs;
assertBusy(() -> assertEquals(finalTotalDocs1, indexer.totalIndexedDocs()), 120, TimeUnit.SECONDS);
logger.info("--> {} total docs indexed", totalDocs);
String snapshotName = baseSnapshotName + i;
createSnapshot(snapshotRepo, snapshotName, new ArrayList<>());
snapshots.put(snapshotName, totalDocs);
if (i < numberOfSnapshots - 1) {
numDocs = randomIntBetween(200, 300);
indexer.continueIndexing(numDocs);
totalDocs += numDocs;
}
}
}

logger.info("Snapshots Status: " + snapshots);

for (String snapshot : snapshots.keySet()) {
logger.info("Restoring snapshot: {}", snapshot);

if (randomBoolean()) {
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(index)).get());
} else {
assertAcked(client().admin().indices().prepareClose(index));
}

assertTrue(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder()
.put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(RemoteStoreEnums.PathType.values()))
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), randomBoolean())
)
.get()
.isAcknowledged()
);

RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepo, snapshot)
.setWaitForCompletion(true)
.setIndices()
.get();

assertEquals(RestStatus.OK, restoreSnapshotResponse1.status());

// Verify restored index's stats
ensureGreen(TimeValue.timeValueSeconds(60), index);
long finalTotalDocs = totalDocs;
assertBusy(() -> {
Long hits = client().prepareSearch(index)
.setQuery(matchAllQuery())
.setSize((int) finalTotalDocs)
.storedFields()
.execute()
.actionGet()
.getHits()
.getTotalHits().value;

assertEquals(snapshots.get(snapshot), hits);
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.apache.lucene.index;

import org.apache.lucene.codecs.DocValuesProducer;

import java.util.Collections;
import java.util.Set;

/**
* Utility class for DocValuesProducers
* @opensearch.internal
*/
public class DocValuesProducerUtil {
/**
* Returns the segment doc values producers for the given doc values producer.
* If the given doc values producer is not a segment doc values producer, an empty set is returned.
* @param docValuesProducer the doc values producer
* @return the segment doc values producers
*/
public static Set<DocValuesProducer> getSegmentDocValuesProducers(DocValuesProducer docValuesProducer) {
if (docValuesProducer instanceof SegmentDocValuesProducer) {
return (((SegmentDocValuesProducer) docValuesProducer).dvProducers);
}
return Collections.emptySet();
}
}
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@
import org.opensearch.rest.action.list.AbstractListAction;
import org.opensearch.rest.action.list.RestIndicesListAction;
import org.opensearch.rest.action.list.RestListAction;
import org.opensearch.rest.action.list.RestShardsListAction;
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestCreatePitAction;
Expand Down Expand Up @@ -979,6 +980,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {

// LIST API
registerHandler.accept(new RestIndicesListAction(responseLimitSettings));
registerHandler.accept(new RestShardsListAction());

// Point in time API
registerHandler.accept(new RestCreatePitAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.pagination.PageParams;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.rest.action.admin.cluster.ClusterAdminTask;

Expand All @@ -27,13 +30,39 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsReq

private String[] indices;
private TimeValue cancelAfterTimeInterval;
private PageParams pageParams = null;
private boolean requestLimitCheckSupported;

public CatShardsRequest() {}

public CatShardsRequest(StreamInput in) throws IOException {
super(in);
this.requestLimitCheckSupported = false;
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
indices = in.readStringArray();
cancelAfterTimeInterval = in.readOptionalTimeValue();
if (in.readBoolean()) {
pageParams = new PageParams(in);
}
requestLimitCheckSupported = in.readBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
if (indices == null) {
out.writeVInt(0);
} else {
out.writeStringArray(indices);
}
out.writeOptionalTimeValue(cancelAfterTimeInterval);
out.writeBoolean(pageParams != null);
if (pageParams != null) {
pageParams.writeTo(out);
}
out.writeBoolean(requestLimitCheckSupported);
}
}

@Override
Expand All @@ -57,6 +86,14 @@ public TimeValue getCancelAfterTimeInterval() {
return this.cancelAfterTimeInterval;
}

public void setPageParams(PageParams pageParams) {
this.pageParams = pageParams;
}

public PageParams getPageParams() {
return pageParams;
}

public void setRequestLimitCheckSupported(final boolean requestLimitCheckSupported) {
this.requestLimitCheckSupported = requestLimitCheckSupported;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.pagination.PageToken;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A response of a cat shards request.
Expand All @@ -23,28 +28,44 @@
*/
public class CatShardsResponse extends ActionResponse {

private ClusterStateResponse clusterStateResponse = null;

private IndicesStatsResponse indicesStatsResponse = null;
private IndicesStatsResponse indicesStatsResponse;
private DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES;
private List<ShardRouting> responseShards = new ArrayList<>();
private PageToken pageToken;

public CatShardsResponse() {}

public CatShardsResponse(StreamInput in) throws IOException {
super(in);
indicesStatsResponse = new IndicesStatsResponse(in);
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
nodes = DiscoveryNodes.readFrom(in, null);
responseShards = in.readList(ShardRouting::new);
if (in.readBoolean()) {
pageToken = new PageToken(in);
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
clusterStateResponse.writeTo(out);
indicesStatsResponse.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
nodes.writeToWithAttribute(out);
out.writeList(responseShards);
out.writeBoolean(pageToken != null);
if (pageToken != null) {
pageToken.writeTo(out);
}
}
}

public void setClusterStateResponse(ClusterStateResponse clusterStateResponse) {
this.clusterStateResponse = clusterStateResponse;
public void setNodes(DiscoveryNodes nodes) {
this.nodes = nodes;
}

public ClusterStateResponse getClusterStateResponse() {
return this.clusterStateResponse;
public DiscoveryNodes getNodes() {
return this.nodes;
}

public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
Expand All @@ -54,4 +75,20 @@ public void setIndicesStatsResponse(IndicesStatsResponse indicesStatsResponse) {
public IndicesStatsResponse getIndicesStatsResponse() {
return this.indicesStatsResponse;
}

public void setResponseShards(List<ShardRouting> responseShards) {
this.responseShards = responseShards;
}

public List<ShardRouting> getResponseShards() {
return this.responseShards;
}

public void setPageToken(PageToken pageToken) {
this.pageToken = pageToken;
}

public PageToken getPageToken() {
return this.pageToken;
}
}
Loading

0 comments on commit b202427

Please sign in to comment.