Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove obsolete resolving logic from TRA #49685

Merged
merged 7 commits into from
Nov 29, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -60,8 +59,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
final ClusterService clusterService, final IndicesService indicesService,
final ThreadPool threadPool, final ShardStateAction stateAction,
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
final ActionFilters actionFilters) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters,
ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -46,9 +45,9 @@ public class TransportShardFlushAction
@Inject
public TransportShardFlushAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
actionFilters, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -48,9 +47,9 @@ public class TransportShardRefreshAction
@Inject
public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public String getDescription() {
return stringBuilder.toString();
}

@Override
public BulkShardRequest routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
return super.routedBasedOnClusterVersion(routedBasedOnClusterVersion);
}

@Override
public void onRetry() {
for (BulkItemRequest item : items) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ protected void doRun() {
requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
bulkShardRequest.routedBasedOnClusterVersion(clusterState.version());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -93,10 +92,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
@Inject
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
}
Expand All @@ -111,11 +109,6 @@ protected BulkShardResponse newResponseInstance(StreamInput in) throws IOExcepti
return new BulkShardResponse(in);
}

@Override
protected boolean resolveIndex() {
return false;
}

@Override
protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard primary,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -55,10 +54,9 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
@Inject
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
ShardStateAction shardStateAction, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
true /* we should never reject resync because of thread pool capacity on primary */);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public final Request waitForActiveShards(final int waitForActiveShards) {
* Used to prevent redirect loops, see also {@link TransportReplicationAction.ReroutePhase#doRun()}
*/
@SuppressWarnings("unchecked")
Request routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
protected Request routedBasedOnClusterVersion(long routedBasedOnClusterVersion) {
this.routedBasedOnClusterVersion = routedBasedOnClusterVersion;
return (Request) this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
Expand Down Expand Up @@ -104,7 +103,6 @@ public abstract class TransportReplicationAction<
protected final ClusterService clusterService;
protected final ShardStateAction shardStateAction;
protected final IndicesService indicesService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
protected final TransportRequestOptions transportOptions;
protected final String executor;

Expand All @@ -117,19 +115,17 @@ public abstract class TransportReplicationAction<
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor) {
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, requestReader, replicaRequestReader, executor, false, false);
requestReader, replicaRequestReader, executor, false, false);
}


protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor,
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
super(actionName, actionFilters, transportService.getTaskManager());
Expand All @@ -138,7 +134,6 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardStateAction = shardStateAction;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.executor = executor;

this.transportPrimaryAction = actionName + "[p]";
Expand Down Expand Up @@ -219,21 +214,10 @@ public ClusterBlockLevel indexBlockLevel() {
return null;
}

/**
* True if provided index should be resolved when resolving request
*/
protected boolean resolveIndex() {
return true;
}

protected TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.EMPTY;
}

private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
}

private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
if (globalBlockLevel != null) {
Expand Down Expand Up @@ -648,8 +632,7 @@ public void onFailure(Exception e) {
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
final String concreteIndex = concreteIndex(state, request);
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
Expand All @@ -658,23 +641,47 @@ protected void doRun() {
finishAsFailed(blockException);
}
} else {
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
final IndexMetaData indexMetaData = state.metaData().index(request.shardId().getIndex());
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
return;
// ensure that the cluster state on the node is at least as high as the node that decided that the index was there
if (state.version() < request.routedBasedOnClusterVersion()) {
logger.trace("failed to find index [{}] for request [{}] despite sender thinking it would be here. " +
"Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
request.shardId().getIndex(), request, state.version(), request.routedBasedOnClusterVersion());
retry(new IndexNotFoundException("failed to find index as current cluster state with version [" + state.version() +
"] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]",
request.shardId().getIndexName()));
return;
} else {
finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
return;
}
}

if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
finishAsFailed(new IndexClosedException(indexMetaData.getIndex()));
return;
}

// resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request);
if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
// if the wait for active shard count has not been set in the request,
// resolve it from the index settings
request.waitForActiveShards(indexMetaData.getWaitForActiveShards());
}
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
"request waitForActiveShards must be set in resolveRequest";

final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
if (primary == null || primary.active() == false) {
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]", request.shardId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
return;
}
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -61,11 +60,10 @@ public abstract class TransportWriteAction<

protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> request,
ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
Writeable.Reader<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
request, replicaRequest, executor, true, forceExecutionOnPrimary);
}

/** Syncs operation result to the translog or throws a shard not available failure */
Expand Down
Loading