Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport the Close Index API refactoring to 6.x #37359

Merged
merged 9 commits into from
Jan 14, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.Request;
import org.junit.After;
import org.junit.Before;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
*/
public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<CloseIndexClusterStateUpdateRequest> {

CloseIndexClusterStateUpdateRequest() {
private final long taskId;

public CloseIndexClusterStateUpdateRequest(final long taskId) {
this.taskId = taskId;
}

public long taskId() {
return taskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -100,24 +99,32 @@ protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterSta
@Override
protected void masterOperation(final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
throw new UnsupportedOperationException("The task parameter is required");
}

@Override
protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) throws Exception {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
if (concreteIndices == null || concreteIndices.length == 0) {
listener.onResponse(new AcknowledgedResponse(true));
return;
}
CloseIndexClusterStateUpdateRequest updateRequest = new CloseIndexClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);

indexStateService.closeIndices(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);

indexStateService.closeIndices(closeRequest, new ActionListener<AcknowledgedResponse>() {

@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
public void onResponse(final AcknowledgedResponse response) {
listener.onResponse(response);
}

@Override
public void onFailure(Exception t) {
public void onFailure(final Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
listener.onFailure(t);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.close;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Consumer;

public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {

public static final String NAME = CloseIndexAction.NAME + "[s]";

@Inject
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
final ClusterService clusterService, final IndicesService indicesService,
final ThreadPool threadPool, final ShardStateAction stateAction,
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT);
}

@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}

@Override
protected void acquirePrimaryOperationPermit(final IndexShard primary,
final ShardRequest request,
final ActionListener<Releasable> onAcquired) {
primary.acquireAllPrimaryOperationsPermits(onAcquired, request.timeout());
}

@Override
protected void acquireReplicaOperationPermit(final IndexShard replica,
final ShardRequest request,
final ActionListener<Releasable> onAcquired,
final long primaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdateOrDeletes) {
replica.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNoOfUpdateOrDeletes, onAcquired, request.timeout());
}

@Override
protected PrimaryResult<ShardRequest, ReplicationResponse> shardOperationOnPrimary(final ShardRequest shardRequest,
final IndexShard primary) throws Exception {
executeShardOperation(shardRequest, primary);
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
}

@Override
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception {
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
}

private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
final ShardId shardId = indexShard.shardId();
if (indexShard.getActiveOperationsCount() != 0) {
throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing");
}

final ClusterBlocks clusterBlocks = clusterService.state().blocks();
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing");
}

final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
if (indexShard.getGlobalCheckpoint() != maxSeqNo) {
throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint()
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
}
indexShard.flush(new FlushRequest());
logger.debug("{} shard is ready for closing", shardId);
}

@Override
protected ReplicationOperation.Replicas<ShardRequest> newReplicasProxy(final long primaryTerm) {
return new VerifyShardBeforeCloseActionReplicasProxy(primaryTerm);
}

/**
* A {@link ReplicasProxy} that marks as stale the shards that are unavailable during the verification
* and the flush of the shard. This is done to ensure that such shards won't be later promoted as primary
* or reopened in an unverified state with potential non flushed translog operations.
*/
class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy {

VerifyShardBeforeCloseActionReplicasProxy(final long primaryTerm) {
super(primaryTerm);
}

@Override
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess,
final Consumer<Exception> onPrimaryDemoted, final Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
}

public static class ShardRequest extends ReplicationRequest<ShardRequest> {

private ClusterBlock clusterBlock;

ShardRequest(){
}

public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
super(shardId);
this.clusterBlock = Objects.requireNonNull(clusterBlock);
setParentTask(parentTaskId);
}

@Override
public String toString() {
return "verify shard " + shardId + " before close with block " + clusterBlock;
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
clusterBlock = ClusterBlock.readClusterBlock(in);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
clusterBlock.writeTo(out);
}

public ClusterBlock clusterBlock() {
return clusterBlock;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.block;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand All @@ -31,29 +32,31 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Objects;

public class ClusterBlock implements Streamable, ToXContentFragment {

private int id;

private @Nullable String uuid;
private String description;

private EnumSet<ClusterBlockLevel> levels;

private boolean retryable;

private boolean disableStatePersistence = false;

private boolean allowReleaseResources;

private RestStatus status;

ClusterBlock() {
private ClusterBlock() {
}

public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence,
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
this(id, null, description, retryable, disableStatePersistence, allowReleaseResources, status, levels);
}

public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, boolean allowReleaseResources,
RestStatus status, EnumSet<ClusterBlockLevel> levels) {
public ClusterBlock(int id, String uuid, String description, boolean retryable, boolean disableStatePersistence,
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
this.id = id;
this.uuid = uuid;
this.description = description;
this.retryable = retryable;
this.disableStatePersistence = disableStatePersistence;
Expand All @@ -66,6 +69,10 @@ public int id() {
return this.id;
}

public String uuid() {
return uuid;
}

public String description() {
return this.description;
}
Expand Down Expand Up @@ -105,6 +112,9 @@ public boolean disableStatePersistence() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Integer.toString(id));
if (uuid != null) {
builder.field("uuid", uuid);
}
builder.field("description", description);
builder.field("retryable", retryable);
if (disableStatePersistence) {
Expand All @@ -128,6 +138,11 @@ public static ClusterBlock readClusterBlock(StreamInput in) throws IOException {
@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
uuid = in.readOptionalString();
} else {
uuid = null;
}
description = in.readString();
final int len = in.readVInt();
ArrayList<ClusterBlockLevel> levels = new ArrayList<>(len);
Expand All @@ -148,6 +163,9 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeOptionalString(uuid);
}
out.writeString(description);
out.writeVInt(levels.size());
for (ClusterBlockLevel level : levels) {
Expand All @@ -164,7 +182,11 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(id).append(",").append(description).append(", blocks ");
sb.append(id).append(",");
if (uuid != null) {
sb.append(uuid).append(',');
}
sb.append(description).append(", blocks ");
String delimiter = "";
for (ClusterBlockLevel level : levels) {
sb.append(delimiter).append(level.name());
Expand All @@ -175,19 +197,19 @@ public String toString() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ClusterBlock that = (ClusterBlock) o;

if (id != that.id) return false;

return true;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ClusterBlock that = (ClusterBlock) o;
return id == that.id && Objects.equals(uuid, that.uuid);
}

@Override
public int hashCode() {
return id;
return Objects.hash(id, uuid);
}

public boolean isAllowReleaseResources() {
Expand Down
Loading