Skip to content

Commit

Permalink
Add max_shard_size parameter for Shrink API
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Binlong <gbinlong@amazon.com>
  • Loading branch information
gaobinlong committed Nov 12, 2022
1 parent 831b3a0 commit 5ef329a
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.unit.ByteSizeValue;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -58,6 +59,7 @@ public class ResizeRequest extends TimedRequest implements Validatable, ToXConte
private final String targetIndex;
private Settings settings = Settings.EMPTY;
private Set<Alias> aliases = new HashSet<>();
private ByteSizeValue maxShardSize;

/**
* Creates a new resize request
Expand Down Expand Up @@ -155,6 +157,24 @@ public ActiveShardCount getWaitForActiveShards() {
return waitForActiveShards;
}

/**
* Sets the maximum size of a primary shard in the new shrunken index.
* This parameter can be used to calculate the lowest factor of the source index's shards number
* which satisfies the maximum shard size requirement.
*
* @param maxShardSize the maximum size of a primary shard in the new shrunken index
*/
public void setMaxShardSize(ByteSizeValue maxShardSize) {
this.maxShardSize = maxShardSize;
}

/**
* Returns the maximum size of a primary shard in the new shrunken index.
*/
public ByteSizeValue getMaxShardSize() {
return maxShardSize;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Assert;
import org.opensearch.common.unit.ByteSizeValue;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -702,6 +703,9 @@ private void resizeTest(ResizeType resizeType, CheckedFunction<ResizeRequest, Re
if (resizeType == ResizeType.SPLIT) {
resizeRequest.setSettings(Settings.builder().put("index.number_of_shards", 2).build());
}
if (resizeType == ResizeType.SHRINK) {
resizeRequest.setMaxShardSize(new ByteSizeValue(randomIntBetween(1, 1000)));
}

Request request = function.apply(resizeRequest);
Assert.assertEquals(HttpPut.METHOD_NAME, request.getMethod());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,32 @@
- match: { _index: target }
- match: { _id: "1" }
- match: { _source: { foo: "hello world" } }

# shrink with max_shard_size
- do:
allowed_warnings:
- "Parameter [master_timeout] is deprecated and will be removed in 3.0. To support inclusive language, please use [cluster_manager_timeout] instead."
indices.shrink:
index: "source"
target: "new_shrunken_index"
wait_for_active_shards: 1
master_timeout: 10s
body:
settings:
index.number_of_replicas: 0
max_shard_size: "10gb"

- do:
cluster.health:
wait_for_status: green

- do:
get:
index: "new_shrunken_index"
id: "1"

- do:
indices.get_settings:
index: "new_shrunken_index"

- match: { new_shrunken_index.settings.index.number_of_shards: "1" }
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.common.Priority;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.Index;
Expand All @@ -75,8 +76,8 @@
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.VersionUtils;

import java.util.Arrays;
Expand Down Expand Up @@ -760,4 +761,72 @@ public void testShrinkThenSplitWithFailedNode() throws Exception {
);
ensureGreen("splitagain");
}

public void testCreateShrinkIndexWithMaxShardSize() {
internalCluster().ensureAtLeastNumDataNodes(2);
final String shrinkNode = internalCluster().startDataOnlyNode();

final int shardCount = between(2, 5);
prepareCreate("source").setSettings(
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shardCount)
).get();
for (int i = 0; i < 20; i++) {
client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
}
client().admin().indices().prepareFlush("source").get();
ensureGreen();

client().admin()
.indices()
.prepareUpdateSettings("source")
.setSettings(
Settings.builder()
.put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), shrinkNode)
.put(IndexMetadata.SETTING_BLOCKS_WRITE, true)
)
.get();
ensureGreen();

// Cannot set max_shard_size and index.number_of_shards at the same time
IllegalArgumentException exc = expectThrows(
IllegalArgumentException.class,
() -> client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
)
.setMaxShardSize(new ByteSizeValue(1))
.setResizeType(ResizeType.SHRINK)
.get()
);
assertEquals(exc.getMessage(), "Cannot set max_shard_size and index.number_of_shards at the same time!");

// use max_shard_size to calculate the target index's shards number
// set max_shard_size to 1 then the target index's shards number will be same with the source index's
assertAcked(
client().admin()
.indices()
.prepareResizeIndex("source", "target")
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.putNull(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey())
.build()
)
.setMaxShardSize(new ByteSizeValue(1))
.setResizeType(ResizeType.SHRINK)
.get()
);
ensureGreen();

GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get();
assertEquals(String.valueOf(shardCount), target.getIndexToSettings().get("target").get("index.number_of_shards"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.Version;
import org.opensearch.common.unit.ByteSizeValue;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -60,6 +62,8 @@
public class ResizeRequest extends AcknowledgedRequest<ResizeRequest> implements IndicesRequest, ToXContentObject {

public static final ObjectParser<ResizeRequest, Void> PARSER = new ObjectParser<>("resize_request");
private static final ParseField MAX_SHARD_SIZE = new ParseField("max_shard_size");

static {
PARSER.declareField(
(parser, request, context) -> request.getTargetIndexRequest().settings(parser.map()),
Expand All @@ -71,19 +75,31 @@ public class ResizeRequest extends AcknowledgedRequest<ResizeRequest> implements
new ParseField("aliases"),
ObjectParser.ValueType.OBJECT
);
PARSER.declareField(
ResizeRequest::setMaxShardSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_SHARD_SIZE.getPreferredName()),
MAX_SHARD_SIZE,
ObjectParser.ValueType.STRING
);
}

private CreateIndexRequest targetIndexRequest;
private String sourceIndex;
private ResizeType type = ResizeType.SHRINK;
private Boolean copySettings = true;
private ByteSizeValue maxShardSize;

public ResizeRequest(StreamInput in) throws IOException {
super(in);
targetIndexRequest = new CreateIndexRequest(in);
sourceIndex = in.readString();
type = in.readEnum(ResizeType.class);
copySettings = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.readBoolean()) {
maxShardSize = new ByteSizeValue(in);
}
}
}

ResizeRequest() {}
Expand All @@ -108,6 +124,9 @@ public ActionRequestValidationException validate() {
if (type == ResizeType.SPLIT && IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexRequest.settings()) == false) {
validationException = addValidationError("index.number_of_shards is required for split operations", validationException);
}
if (maxShardSize != null && maxShardSize.getBytes() <= 0) {
validationException = addValidationError("max_shard_size must be greater than 0", validationException);
}
assert copySettings == null || copySettings;
return validationException;
}
Expand All @@ -123,6 +142,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(sourceIndex);
out.writeEnum(type);
out.writeOptionalBoolean(copySettings);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(maxShardSize);
}
}

@Override
Expand Down Expand Up @@ -205,6 +227,24 @@ public Boolean getCopySettings() {
return copySettings;
}

/**
* Sets the maximum size of a primary shard in the new shrunken index.
* This parameter can be used to calculate the lowest factor of the source index's shards number
* which satisfies the maximum shard size requirement.
*
* @param maxShardSize the maximum size of a primary shard in the new shrunken index
*/
public void setMaxShardSize(ByteSizeValue maxShardSize) {
this.maxShardSize = maxShardSize;
}

/**
* Returns the maximum size of a primary shard in the new shrunken index.
*/
public ByteSizeValue getMaxShardSize() {
return maxShardSize;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -221,6 +261,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
}
builder.endObject();
if (maxShardSize != null) {
builder.field(MAX_SHARD_SIZE.getPreferredName(), maxShardSize);
}
}
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.support.master.AcknowledgedRequestBuilder;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeValue;

/**
* Transport request builder for resizing an index
Expand Down Expand Up @@ -95,4 +96,12 @@ public ResizeRequestBuilder setResizeType(ResizeType type) {
this.request.setResizeType(type);
return this;
}

/**
* Sets the maximum size of a primary shard in the new shrunken index.
*/
public ResizeRequestBuilder setMaxShardSize(ByteSizeValue maxShardSize) {
this.request.setMaxShardSize(maxShardSize);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.store.StoreStats;

import java.io.IOException;
import java.util.Locale;
Expand Down Expand Up @@ -141,11 +143,12 @@ protected void clusterManagerOperation(
.prepareStats(sourceIndex)
.clear()
.setDocs(true)
.setStore(true)
.execute(ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, i -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, sourceIndex, targetIndex);
}, indicesStatsResponse.getPrimaries().store, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest,
ActionListener.map(
Expand All @@ -162,6 +165,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
final ResizeRequest resizeRequest,
final ClusterState state,
final IntFunction<DocsStats> perShardDocStats,
final StoreStats primaryShardsStoreStats,
String sourceIndexName,
String targetIndexName
) {
Expand All @@ -176,12 +180,16 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
targetIndexSettingsBuilder.remove(IndexMetadata.SETTING_HISTORY_UUID);
final Settings targetIndexSettings = targetIndexSettingsBuilder.build();
final int numShards;
ByteSizeValue maxShardSize = resizeRequest.getMaxShardSize();
if (IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)) {
numShards = IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings);
if (resizeRequest.getResizeType() == ResizeType.SHRINK && maxShardSize != null) {
throw new IllegalArgumentException("Cannot set max_shard_size and index.number_of_shards at the same time!");
}
} else {
assert resizeRequest.getResizeType() != ResizeType.SPLIT : "split must specify the number of shards explicitly";
if (resizeRequest.getResizeType() == ResizeType.SHRINK) {
numShards = 1;
numShards = calculateTargetIndexShardsNum(maxShardSize, primaryShardsStoreStats, metadata);
} else {
assert resizeRequest.getResizeType() == ResizeType.CLONE;
numShards = metadata.getNumberOfShards();
Expand Down Expand Up @@ -250,6 +258,39 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
.copySettings(resizeRequest.getCopySettings() == null ? false : resizeRequest.getCopySettings());
}

/**
* Calculate target index's shards number according to maxShardSize and the source index's storage(only primary shards included)
* @param maxShardSize the maximum size of a primary shard in the target index
* @param sourceIndexShardStoreStats primary shards' store stats of the source index
* @param sourceIndexMetaData source index's metadata
* @return target index's shards number
*/
protected static int calculateTargetIndexShardsNum(
ByteSizeValue maxShardSize,
StoreStats sourceIndexShardStoreStats,
IndexMetadata sourceIndexMetaData
) {
if (maxShardSize == null
|| sourceIndexShardStoreStats == null
|| maxShardSize.getBytes() == 0
|| sourceIndexShardStoreStats.getSizeInBytes() == 0) {
return 1;
}

int sourceIndexShardsNum = sourceIndexMetaData.getNumberOfShards();
int minValue = (int) Math.ceil((double) sourceIndexShardStoreStats.getSizeInBytes() / maxShardSize.getBytes());
if (minValue >= sourceIndexShardsNum) {
return sourceIndexShardsNum;
}

for (int i = minValue; i < sourceIndexShardsNum; i++) {
if (sourceIndexShardsNum % i == 0) {
return i;
}
}
return sourceIndexShardsNum;
}

@Override
protected String getClusterManagerActionName(DiscoveryNode node) {
return super.getClusterManagerActionName(node);
Expand Down
Loading

0 comments on commit 5ef329a

Please sign in to comment.