Skip to content

Commit

Permalink
Revert "Remove obsolete resolving logic from TRA (#49647)"
Browse files Browse the repository at this point in the history
This reverts commit 0827ea2.
  • Loading branch information
ywelsch committed Nov 28, 2019
1 parent 0827ea2 commit 04e9cbd
Show file tree
Hide file tree
Showing 22 changed files with 121 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -57,8 +58,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
final ClusterService clusterService, final IndicesService indicesService,
final ThreadPool threadPool, final ShardStateAction stateAction,
final ActionFilters actionFilters) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters,
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -43,9 +44,9 @@ public class TransportShardFlushAction
@Inject
public TransportShardFlushAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
actionFilters, indexNameExpressionResolver, ShardFlushRequest::new, ShardFlushRequest::new, ThreadPool.Names.FLUSH);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -45,9 +46,9 @@ public class TransportShardRefreshAction
@Inject
public TransportShardRefreshAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
indexNameExpressionResolver, BasicReplicationRequest::new, BasicReplicationRequest::new, ThreadPool.Names.REFRESH);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -90,9 +91,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
@Inject
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters) {
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
}
Expand All @@ -107,6 +109,11 @@ protected BulkShardResponse newResponseInstance(StreamInput in) throws IOExcepti
return new BulkShardResponse(in);
}

@Override
protected boolean resolveIndex() {
return false;
}

@Override
protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard primary,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -54,9 +55,10 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
@Inject
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters) {
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
true /* we should never reject resync because of thread pool capacity on primary */);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
Expand Down Expand Up @@ -104,6 +105,7 @@ public abstract class TransportReplicationAction<
protected final ClusterService clusterService;
protected final ShardStateAction shardStateAction;
protected final IndicesService indicesService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
protected final TransportRequestOptions transportOptions;
protected final String executor;

Expand All @@ -116,17 +118,19 @@ public abstract class TransportReplicationAction<
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor) {
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
requestReader, replicaRequestReader, executor, false, false);
indexNameExpressionResolver, requestReader, replicaRequestReader, executor, false, false);
}


protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader, String executor,
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
super(actionName, actionFilters, transportService.getTaskManager());
Expand All @@ -135,6 +139,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardStateAction = shardStateAction;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.executor = executor;

this.transportPrimaryAction = actionName + "[p]";
Expand Down Expand Up @@ -215,10 +220,21 @@ public ClusterBlockLevel indexBlockLevel() {
return null;
}

/**
* True if provided index should be resolved when resolving request
*/
protected boolean resolveIndex() {
return true;
}

protected TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.EMPTY;
}

private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
}

private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
if (globalBlockLevel != null) {
Expand Down Expand Up @@ -633,7 +649,7 @@ public void onFailure(Exception e) {
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
final String concreteIndex = request.shardId().getIndexName();
final String concreteIndex = concreteIndex(state, request);
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
if (blockException != null) {
if (blockException.retryable()) {
Expand All @@ -643,6 +659,7 @@ protected void doRun() {
finishAsFailed(blockException);
}
} else {
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -60,10 +61,11 @@ public abstract class TransportWriteAction<

protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> request,
Writeable.Reader<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
request, replicaRequest, executor, true, forceExecutionOnPrimary);
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
}

/** Syncs operation result to the translog or throws a shard not available failure */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -63,7 +64,8 @@ public GlobalCheckpointSyncAction(
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters) {
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
ACTION_NAME,
Expand All @@ -73,6 +75,7 @@ public GlobalCheckpointSyncAction(
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
Request::new,
Request::new,
ThreadPool.Names.MANAGEMENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -76,7 +77,8 @@ public RetentionLeaseBackgroundSyncAction(
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters) {
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
ACTION_NAME,
Expand All @@ -86,6 +88,7 @@ public RetentionLeaseBackgroundSyncAction(
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
Request::new,
Request::new,
ThreadPool.Names.MANAGEMENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -73,7 +74,8 @@ public RetentionLeaseSyncAction(
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters) {
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
ACTION_NAME,
Expand All @@ -83,6 +85,7 @@ public RetentionLeaseSyncAction(
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
RetentionLeaseSyncAction.Request::new,
RetentionLeaseSyncAction.Request::new,
ThreadPool.Names.MANAGEMENT, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -118,7 +119,7 @@ public void setUp() throws Exception {

ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, transportService, clusterService, mock(IndicesService.class),
mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class));
mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class));
}

@Override
Expand Down
Loading

0 comments on commit 04e9cbd

Please sign in to comment.