Skip to content

Commit

Permalink
[Close Index API] Propagate tasks ids between Freeze, Close and Verif…
Browse files Browse the repository at this point in the history
…y Shard actions (#36630)

This pull request changes the Freeze Index and Close Index actions so 
that these actions always requires a Task. The task's id is then propagated 
from the Freeze action to the Close action, and then to the Verify shard action. 
This way it is possible to track which Freeze task initiates the closing of an index, 
and which consecutive verifiy shard are executed for the index closing.
  • Loading branch information
tlrx authored Jan 7, 2019
1 parent bd2af2c commit 1959388
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
*/
public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<CloseIndexClusterStateUpdateRequest> {

public CloseIndexClusterStateUpdateRequest() {
private final long taskId;

public CloseIndexClusterStateUpdateRequest(final long taskId) {
this.taskId = taskId;
}

public long taskId() {
return taskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,19 @@ protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterSta
@Override
protected void masterOperation(final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
throw new UnsupportedOperationException("The task parameter is required");
}

@Override
protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) throws Exception {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
if (concreteIndices == null || concreteIndices.length == 0) {
listener.onResponse(new AcknowledgedResponse(true));
return;
}

final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest()
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -141,8 +142,9 @@ public static class ShardRequest extends ReplicationRequest<ShardRequest> {
ShardRequest(){
}

public ShardRequest(final ShardId shardId) {
public ShardRequest(final ShardId shardId, final TaskId parentTaskId) {
super(shardId);
setParentTask(parentTaskId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
Expand All @@ -63,6 +62,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
Expand Down Expand Up @@ -120,9 +120,6 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina
throw new IllegalArgumentException("Index name is required");
}

final TimeValue timeout = request.ackTimeout();
final TimeValue masterTimeout = request.masterNodeTimeout();

clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
new ClusterStateUpdateTask(Priority.URGENT) {

Expand All @@ -141,7 +138,7 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta
} else {
assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed";
threadPool.executor(ThreadPool.Names.MANAGEMENT)
.execute(new WaitForClosedBlocksApplied(blockedIndices, timeout,
.execute(new WaitForClosedBlocksApplied(blockedIndices, request,
ActionListener.wrap(closedBlocksResults ->
clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
Expand Down Expand Up @@ -176,7 +173,7 @@ public void onFailure(final String source, final Exception e) {

@Override
public TimeValue timeout() {
return masterTimeout;
return request.masterNodeTimeout();
}
}
);
Expand Down Expand Up @@ -246,18 +243,18 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterSta
class WaitForClosedBlocksApplied extends AbstractRunnable {

private final Set<Index> blockedIndices;
private final @Nullable TimeValue timeout;
private final CloseIndexClusterStateUpdateRequest request;
private final ActionListener<Map<Index, AcknowledgedResponse>> listener;

private WaitForClosedBlocksApplied(final Set<Index> blockedIndices,
final @Nullable TimeValue timeout,
final CloseIndexClusterStateUpdateRequest request,
final ActionListener<Map<Index, AcknowledgedResponse>> listener) {
if (blockedIndices == null || blockedIndices.isEmpty()) {
throw new IllegalArgumentException("Cannot wait for closed block to be applied to null or empty list of blocked indices");
}
this.blockedIndices = blockedIndices;
this.request = request;
this.listener = listener;
this.timeout = timeout;
}

@Override
Expand All @@ -271,7 +268,7 @@ protected void doRun() throws Exception {
final CountDown countDown = new CountDown(blockedIndices.size());
final ClusterState state = clusterService.state();
for (Index blockedIndex : blockedIndices) {
waitForShardsReadyForClosing(blockedIndex, state, timeout, response -> {
waitForShardsReadyForClosing(blockedIndex, state, response -> {
results.put(blockedIndex, response);
if (countDown.countDown()) {
listener.onResponse(unmodifiableMap(results));
Expand All @@ -280,7 +277,7 @@ protected void doRun() throws Exception {
}
}

private void waitForShardsReadyForClosing(final Index index, final ClusterState state, @Nullable final TimeValue timeout,
private void waitForShardsReadyForClosing(final Index index, final ClusterState state,
final Consumer<AcknowledgedResponse> onResponse) {
final IndexMetaData indexMetaData = state.metaData().index(index);
if (indexMetaData == null) {
Expand All @@ -302,7 +299,7 @@ private void waitForShardsReadyForClosing(final Index index, final ClusterState
for (IntObjectCursor<IndexShardRoutingTable> shard : shards) {
final IndexShardRoutingTable shardRoutingTable = shard.value;
final ShardId shardId = shardRoutingTable.shardId();
sendVerifyShardBeforeCloseRequest(shardRoutingTable, timeout, new NotifyOnceListener<ReplicationResponse>() {
sendVerifyShardBeforeCloseRequest(shardRoutingTable, new NotifyOnceListener<ReplicationResponse>() {
@Override
public void innerOnResponse(final ReplicationResponse replicationResponse) {
ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo();
Expand All @@ -326,7 +323,7 @@ private void processIfFinished() {
}
}

private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, @Nullable final TimeValue timeout,
private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable,
final ActionListener<ReplicationResponse> listener) {
final ShardId shardId = shardRoutingTable.shardId();
if (shardRoutingTable.primaryShard().unassigned()) {
Expand All @@ -336,10 +333,11 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar
listener.onResponse(response);
return;
}
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId());
final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId);
if (timeout != null) {
shardRequest.timeout(timeout);
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, parentTaskId);
if (request.ackTimeout() != null) {
shardRequest.timeout(request.ackTimeout());
}
// TODO propagate a task id from the parent CloseIndexRequest to the ShardCloseRequests
transportVerifyShardBeforeCloseAction.execute(shardRequest, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -130,7 +131,7 @@ public static void afterClass() {

private void executeOnPrimaryOrReplica() throws Exception {
final TransportVerifyShardBeforeCloseAction.ShardRequest request =
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId());
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), new TaskId("_node_id", randomNonNegativeLong()));
if (randomBoolean()) {
assertNotNull(action.shardOperationOnPrimary(request, indexShard));
} else {
Expand Down Expand Up @@ -204,7 +205,8 @@ public void testUnavailableShardsMarkedAsStale() throws Exception {
assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0));

final PlainActionFuture<PrimaryResult> listener = new PlainActionFuture<>();
TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId);
TransportVerifyShardBeforeCloseAction.ShardRequest request =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, new TaskId(clusterService.localNode().getId(), 0L));
ReplicationOperation.Replicas<TransportVerifyShardBeforeCloseAction.ShardRequest> proxy = action.newReplicasProxy(primaryTerm);
ReplicationOperation<TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,19 @@ private Index[] resolveIndices(FreezeRequest request, ClusterState state) {

@Override
protected void masterOperation(FreezeRequest request, ClusterState state, ActionListener<FreezeResponse> listener) {
throw new UnsupportedOperationException("The task parameter is required");
}

@Override
protected void masterOperation(Task task, TransportFreezeIndexAction.FreezeRequest request, ClusterState state,
ActionListener<TransportFreezeIndexAction.FreezeResponse> listener) throws Exception {
final Index[] concreteIndices = resolveIndices(request, state);
if (concreteIndices.length == 0) {
listener.onResponse(new FreezeResponse(true, true));
return;
}

final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest()
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);
Expand Down

0 comments on commit 1959388

Please sign in to comment.