Skip to content

Commit

Permalink
Fail closing or deleting indices during a full snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed Mar 10, 2016
1 parent 0bbb84c commit 266394c
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,17 @@ public static class Entry {
private final State state;
private final SnapshotId snapshotId;
private final boolean includeGlobalState;
private final boolean partial;
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
private final List<String> indices;
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;

public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, List<String> indices, long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
public Entry(SnapshotId snapshotId, boolean includeGlobalState, boolean partial, State state, List<String> indices, long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this.state = state;
this.snapshotId = snapshotId;
this.includeGlobalState = includeGlobalState;
this.partial = partial;
this.indices = indices;
this.startTime = startTime;
if (shards == null) {
Expand All @@ -90,7 +92,7 @@ public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, Lis
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshotId, entry.includeGlobalState, state, entry.indices, entry.startTime, shards);
this(entry.snapshotId, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, shards);
}

public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
Expand Down Expand Up @@ -121,6 +123,10 @@ public boolean includeGlobalState() {
return includeGlobalState;
}

public boolean partial() {
return partial;
}

public long startTime() {
return startTime;
}
Expand All @@ -133,6 +139,7 @@ public boolean equals(Object o) {
Entry entry = (Entry) o;

if (includeGlobalState != entry.includeGlobalState) return false;
if (partial != entry.partial) return false;
if (startTime != entry.startTime) return false;
if (!indices.equals(entry.indices)) return false;
if (!shards.equals(entry.shards)) return false;
Expand All @@ -148,6 +155,7 @@ public int hashCode() {
int result = state.hashCode();
result = 31 * result + snapshotId.hashCode();
result = 31 * result + (includeGlobalState ? 1 : 0);
result = 31 * result + (partial ? 1 : 0);
result = 31 * result + shards.hashCode();
result = 31 * result + indices.hashCode();
result = 31 * result + waitingIndices.hashCode();
Expand Down Expand Up @@ -360,6 +368,7 @@ public SnapshotsInProgress readFrom(StreamInput in) throws IOException {
for (int i = 0; i < entries.length; i++) {
SnapshotId snapshotId = SnapshotId.readSnapshotId(in);
boolean includeGlobalState = in.readBoolean();
boolean partial = in.readBoolean();
State state = State.fromValue(in.readByte());
int indices = in.readVInt();
List<String> indexBuilder = new ArrayList<>();
Expand All @@ -375,7 +384,7 @@ public SnapshotsInProgress readFrom(StreamInput in) throws IOException {
State shardState = State.fromValue(in.readByte());
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState));
}
entries[i] = new Entry(snapshotId, includeGlobalState, state, Collections.unmodifiableList(indexBuilder), startTime, builder.build());
entries[i] = new Entry(snapshotId, includeGlobalState, partial, state, Collections.unmodifiableList(indexBuilder), startTime, builder.build());
}
return new SnapshotsInProgress(entries);
}
Expand All @@ -386,6 +395,7 @@ public void writeTo(StreamOutput out) throws IOException {
for (Entry entry : entries) {
entry.snapshotId().writeTo(out);
out.writeBoolean(entry.includeGlobalState());
out.writeBoolean(entry.partial());
out.writeByte(entry.state().value());
out.writeVInt(entry.indices().size());
for (String index : entry.indices()) {
Expand All @@ -406,6 +416,7 @@ static final class Fields {
static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots");
static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot");
static final XContentBuilderString INCLUDE_GLOBAL_STATE = new XContentBuilderString("include_global_state");
static final XContentBuilderString PARTIAL = new XContentBuilderString("partial");
static final XContentBuilderString STATE = new XContentBuilderString("state");
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString START_TIME_MILLIS = new XContentBuilderString("start_time_millis");
Expand All @@ -431,6 +442,7 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p
builder.field(Fields.REPOSITORY, entry.snapshotId().getRepository());
builder.field(Fields.SNAPSHOT, entry.snapshotId().getSnapshot());
builder.field(Fields.INCLUDE_GLOBAL_STATE, entry.includeGlobalState());
builder.field(Fields.PARTIAL, entry.partial());
builder.field(Fields.STATE, entry.state());
builder.startArray(Fields.INDICES);
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -67,7 +68,7 @@ public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, Clus
}

public void deleteIndices(final Request request, final Listener userListener) {
Collection<String> indices = Arrays.asList(request.indices);
Set<String> indices = Sets.newHashSet(request.indices);
final DeleteIndexListener listener = new DeleteIndexListener(userListener);

clusterService.submitStateUpdateTask("delete-index " + indices, new ClusterStateUpdateTask(Priority.URGENT) {
Expand All @@ -84,6 +85,9 @@ public void onFailure(String source, Throwable t) {

@Override
public ClusterState execute(final ClusterState currentState) {
// Check if index deletion conflicts with any running snapshots
SnapshotsService.checkIndexDeletion(currentState, indices);

RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

package org.elasticsearch.cluster.metadata;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -39,8 +37,9 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -99,27 +98,10 @@ public ClusterState execute(ClusterState currentState) {
return currentState;
}

// Check if any of the indices to be closed are currently being restored from a snapshot and fail closing if such an index
// is found as closing an index that is being restored makes the index unusable (it cannot be recovered).
RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
if (restore != null) {
Set<String> indicesToFail = null;
for (RestoreInProgress.Entry entry : restore.entries()) {
for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
if (!shard.value.state().completed()) {
if (indicesToClose.contains(shard.key.getIndexName())) {
if (indicesToFail == null) {
indicesToFail = new HashSet<>();
}
indicesToFail.add(shard.key.getIndexName());
}
}
}
}
if (indicesToFail != null) {
throw new IllegalArgumentException("Cannot close indices that are being restored: " + indicesToFail);
}
}
// Check if index closing conflicts with any running restores
RestoreService.checkIndexClosing(currentState, indicesToClose);
// Check if index closing conflicts with any running snapshots
SnapshotsService.checkIndexClosing(currentState, indicesToClose);

logger.info("closing indices [{}]", indicesAsString);

Expand Down
26 changes: 26 additions & 0 deletions core/src/main/java/org/elasticsearch/snapshots/RestoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,32 @@ private boolean failed(Snapshot snapshot, String index) {
return false;
}

/**
* Check if any of the indices to be closed are currently being restored from a snapshot and fail closing if such an index
* is found as closing an index that is being restored makes the index unusable (it cannot be recovered).
*/
public static void checkIndexClosing(ClusterState currentState, Set<String> indices) {
RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
if (restore != null) {
Set<String> indicesToFail = null;
for (RestoreInProgress.Entry entry : restore.entries()) {
for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
if (!shard.value.state().completed()) {
if (indices.contains(shard.key.getIndexName())) {
if (indicesToFail == null) {
indicesToFail = new HashSet<>();
}
indicesToFail.add(shard.key.getIndexName());
}
}
}
}
if (indicesToFail != null) {
throw new IllegalArgumentException("Cannot close indices that are being restored: " + indicesToFail);
}
}
}

/**
* Adds restore completion listener
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public ClusterState execute(ClusterState currentState) {
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndices(currentState, request.indicesOptions(), request.indices()));
logger.trace("[{}][{}] creating snapshot for indices [{}]", request.repository(), request.name(), indices);
newSnapshot = new SnapshotsInProgress.Entry(snapshotId, request.includeGlobalState(), State.INIT, indices, System.currentTimeMillis(), null);
newSnapshot = new SnapshotsInProgress.Entry(snapshotId, request.includeGlobalState(), request.partial(), State.INIT, indices, System.currentTimeMillis(), null);
snapshots = new SnapshotsInProgress(newSnapshot);
} else {
// TODO: What should we do if a snapshot is already running?
Expand All @@ -228,7 +228,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() {
@Override
public void run() {
beginSnapshot(newState, newSnapshot, request.partial, listener);
beginSnapshot(newState, newSnapshot, request.partial(), listener);
}
});
}
Expand Down Expand Up @@ -1061,6 +1061,63 @@ private ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard
return builder.build();
}

/**
* Check if any of the indices to be deleted are currently being snapshotted. Fail as deleting an index that is being
* snapshotted (with partial == false) makes the snapshot fail.
*/
public static void checkIndexDeletion(ClusterState currentState, Set<String> indices) {
Set<String> indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices);
if (indicesToFail != null) {
throw new IllegalArgumentException("Cannot delete indices that are being snapshotted: " + indicesToFail +
". Try again after snapshot finishes or cancel the currently running snapshot.");
}
}

/**
* Check if any of the indices to be closed are currently being snapshotted. Fail as closing an index that is being
* snapshotted (with partial == false) makes the snapshot fail.
*/
public static void checkIndexClosing(ClusterState currentState, Set<String> indices) {
Set<String> indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices);
if (indicesToFail != null) {
throw new IllegalArgumentException("Cannot close indices that are being snapshotted: " + indicesToFail +
". Try again after snapshot finishes or cancel the currently running snapshot.");
}
}

private static Set<String> indicesToFailForCloseOrDeletion(ClusterState currentState, Set<String> indices) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
Set<String> indicesToFail = null;
if (snapshots != null) {
for (final SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.partial() == false) {
if (entry.state() == State.INIT) {
for (String index : entry.indices()) {
if (indices.contains(index)) {
if (indicesToFail == null) {
indicesToFail = new HashSet<>();
}
indicesToFail.add(index);
}
}
} else {
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
if (!shard.value.state().completed()) {
if (indices.contains(shard.key.getIndexName())) {
if (indicesToFail == null) {
indicesToFail = new HashSet<>();
}
indicesToFail.add(shard.key.getIndexName());
}
}
}
}
}
}
}
return indicesToFail;
}

/**
* Adds snapshot completion listener
*
Expand Down Expand Up @@ -1302,6 +1359,15 @@ public boolean includeGlobalState() {
return includeGlobalState;
}

/**
* Returns true if partial snapshot should be allowed
*
* @return true if partial snapshot should be allowed
*/
public boolean partial() {
return partial;
}

/**
* Returns master node timeout
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ public ClusterState.Custom randomCreate(String name) {
return new SnapshotsInProgress(new SnapshotsInProgress.Entry(
new SnapshotId(randomName("repo"), randomName("snap")),
randomBoolean(),
randomBoolean(),
SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)),
Collections.<String>emptyList(),
Math.abs(randomLong()),
Expand Down
Loading

0 comments on commit 266394c

Please sign in to comment.