Skip to content

Commit

Permalink
Upstream fetch
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <pgodithi@amazon.com>
  • Loading branch information
prudhvigodithi committed Feb 19, 2025
1 parent db5212b commit 6b8e897
Show file tree
Hide file tree
Showing 31 changed files with 1,953 additions and 1,041 deletions.
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@
import org.opensearch.action.admin.indices.resolve.ResolveIndexAction;
import org.opensearch.action.admin.indices.rollover.RolloverAction;
import org.opensearch.action.admin.indices.rollover.TransportRolloverAction;
import org.opensearch.action.admin.indices.searchonly.SearchOnlyAction;
import org.opensearch.action.admin.indices.searchonly.TransportSearchOnlyAction;
import org.opensearch.action.admin.indices.scale.searchonly.SearchOnlyScaleAction;
import org.opensearch.action.admin.indices.scale.searchonly.TransportSearchOnlyAction;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
Expand Down Expand Up @@ -421,7 +421,7 @@
import org.opensearch.rest.action.admin.indices.RestResizeHandler;
import org.opensearch.rest.action.admin.indices.RestResolveIndexAction;
import org.opensearch.rest.action.admin.indices.RestRolloverIndexAction;
import org.opensearch.rest.action.admin.indices.RestSearchonlyAction;
import org.opensearch.rest.action.admin.indices.RestSearchOnlyAction;
import org.opensearch.rest.action.admin.indices.RestSimulateIndexTemplateAction;
import org.opensearch.rest.action.admin.indices.RestSimulateTemplateAction;
import org.opensearch.rest.action.admin.indices.RestSyncedFlushAction;
Expand Down Expand Up @@ -688,7 +688,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(AutoPutMappingAction.INSTANCE, TransportAutoPutMappingAction.class);
actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
actions.register(SearchOnlyAction.INSTANCE, TransportSearchOnlyAction.class);
actions.register(SearchOnlyScaleAction.INSTANCE, TransportSearchOnlyAction.class);
actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
actions.register(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
actions.register(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
Expand Down Expand Up @@ -910,7 +910,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestUpgradeAction());
registerHandler.accept(new RestUpgradeStatusAction());
registerHandler.accept(new RestClearIndicesCacheAction());
registerHandler.accept(new RestSearchonlyAction());
registerHandler.accept(new RestSearchOnlyAction());
registerHandler.accept(new RestIndexAction());
registerHandler.accept(new CreateHandler());
registerHandler.accept(new AutoIdHandler(nodesInCluster));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale.searchonly;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.transport.TransportRequest;

import java.io.IOException;
import java.util.List;

/**
* A transport request sent to nodes to facilitate shard synchronization during search-only scaling operations.
* <p>
* This request is sent from the cluster manager to data nodes that host primary shards for the target index
* during scale operations. It contains the index name and a list of shard IDs that need to be synchronized
* before completing a scale-down operation.
* <p>
* When a node receives this request, it performs final sync and flush operations on the specified shards,
* ensuring all operations are committed and the remote store is synced. This is a crucial step in
* the scale-down process to ensure no data loss occurs when the index transitions to search-only mode.
*/
class NodeSearchOnlyRequest extends TransportRequest {
private final String index;
private final List<ShardId> shardIds;

/**
* Constructs a new NodeSearchOnlyRequest.
*
* @param index the name of the index being scaled
* @param shardIds the list of shard IDs to be synchronized on the target node
*/
NodeSearchOnlyRequest(String index, List<ShardId> shardIds) {
this.index = index;
this.shardIds = shardIds;
}

/**
* Deserialization constructor.
*
* @param in the stream input to read from
* @throws IOException if there is an I/O error during deserialization
*/
NodeSearchOnlyRequest(StreamInput in) throws IOException {
super(in);
this.index = in.readString();
this.shardIds = in.readList(ShardId::new);
}

/**
* Serializes this request to the given output stream.
*
* @param out the output stream to write to
* @throws IOException if there is an I/O error during serialization
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeList(shardIds);
}

/**
* Returns the index name associated with this request.
*
* @return the index name
*/
String getIndex() {
return index;
}

/**
* Returns the list of shard IDs to be synchronized.
*
* @return the list of shard IDs
*/
List<ShardId> getShardIds() {
return shardIds;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale.searchonly;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.transport.TransportResponse;

import java.io.IOException;
import java.util.List;

/**
* Response sent from nodes after processing a {@link NodeSearchOnlyRequest} during search-only scaling operations.
* <p>
* This response contains information about the node that processed the request and the results of
* synchronization attempts for each requested shard. The cluster manager uses these responses to
* determine whether it's safe to proceed with finalizing a scale-down operation.
* <p>
* Each response includes details about whether shards have any uncommitted operations or need
* additional synchronization, which would indicate the scale operation should be delayed until
* the cluster reaches a stable state.
*/
class NodeSearchOnlyResponse extends TransportResponse {
private final DiscoveryNode node;
private final List<ShardSearchOnlyResponse> shardResponses;

/**
* Constructs a new NodeSearchOnlyResponse.
*
* @param node the node that processed the synchronization request
* @param shardResponses the list of responses from individual shard synchronization attempts
*/
NodeSearchOnlyResponse(DiscoveryNode node, List<ShardSearchOnlyResponse> shardResponses) {
this.node = node;
this.shardResponses = shardResponses;
}

/**
* Deserialization constructor.
*
* @param in the stream input to read from
* @throws IOException if there is an I/O error during deserialization
*/
NodeSearchOnlyResponse(StreamInput in) throws IOException {
node = new DiscoveryNode(in);
shardResponses = in.readList(ShardSearchOnlyResponse::new);
}

/**
* Serializes this response to the given output stream.
*
* @param out the output stream to write to
* @throws IOException if there is an I/O error during serialization
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeList(shardResponses);
}

/**
* Returns the node that processed the synchronization request.
*
* @return the discovery node information
*/
public DiscoveryNode getNode() {
return node;
}

/**
* Returns the list of shard-level synchronization responses.
* <p>
* These responses contain critical information about the state of each shard,
* including whether there are uncommitted operations or if additional synchronization
* is needed before the scale operation can safely proceed.
*
* @return the list of shard responses
*/
public List<ShardSearchOnlyResponse> getShardResponses() {
return shardResponses;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.scale.searchonly;

import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.core.action.ActionListener;
import org.opensearch.indices.replication.common.ReplicationType;

/**
* Validates that indices meet the prerequisites for search-only scale operations.
* <p>
* This validator ensures that indexes being scaled up or down satisfy all the
* necessary conditions for a safe scaling operation. It checks for required settings,
* index state compatibility, and configuration prerequisites such as remote store
* and segment replication settings.
*/
class ScaleOperationValidator {

/**
* Validates that the given index meets the prerequisites for the scale operation.
* <p>
* For scale-down operations, this method verifies:
* <ul>
* <li>The index exists</li>
* <li>The index is not already in search-only mode</li>
* <li>The index has at least one search-only replica configured</li>
* <li>Remote store is enabled for the index</li>
* <li>Segment replication is enabled for the index</li>
* </ul>
* <p>
* For scale-up operations, this method verifies:
* <ul>
* <li>The index exists</li>
* <li>The index is currently in search-only mode</li>
* </ul>
*
* @param indexMetadata the metadata of the index to validate
* @param index the name of the index being validated
* @param listener the action listener to notify in case of validation failure
* @param isScaleDown true if validating for scale-down, false for scale-up
* @return true if validation succeeds, false if validation fails (and listener is notified)
*/
boolean validateScalePrerequisites(
IndexMetadata indexMetadata,
String index,
ActionListener<AcknowledgedResponse> listener,
boolean isScaleDown
) {
try {
if (indexMetadata == null) {
throw new IllegalArgumentException("Index [" + index + "] not found");
}
if (isScaleDown) {
if (indexMetadata.getSettings().getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false)) {
throw new IllegalStateException("Index [" + index + "] is already in search-only mode");
}

if (indexMetadata.getNumberOfSearchOnlyReplicas() == 0) {
throw new IllegalArgumentException("Cannot scale to zero without search replicas for index: " + index);
}
if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) {
throw new IllegalArgumentException(
"To scale to zero, " + IndexMetadata.SETTING_REMOTE_STORE_ENABLED + " must be enabled for index: " + index
);
}
if (!ReplicationType.SEGMENT.toString().equals(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))) {
throw new IllegalArgumentException("To scale to zero, segment replication must be enabled for index: " + index);
}
} else {
if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false)) {
throw new IllegalStateException("Index [" + index + "] is not in search-only mode");
}
}
return true;
} catch (Exception e) {
listener.onFailure(e);
return false;
}
}
}
Loading

0 comments on commit 6b8e897

Please sign in to comment.