diff --git a/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 78eef31633208..c90edee0d50f9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -69,15 +69,17 @@ public static class Entry { private final State state; private final SnapshotId snapshotId; private final boolean includeGlobalState; + private final boolean partial; private final ImmutableOpenMap shards; private final List indices; private final ImmutableOpenMap> waitingIndices; private final long startTime; - public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, List indices, long startTime, ImmutableOpenMap shards) { + public Entry(SnapshotId snapshotId, boolean includeGlobalState, boolean partial, State state, List indices, long startTime, ImmutableOpenMap shards) { this.state = state; this.snapshotId = snapshotId; this.includeGlobalState = includeGlobalState; + this.partial = partial; this.indices = indices; this.startTime = startTime; if (shards == null) { @@ -90,7 +92,7 @@ public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, Lis } public Entry(Entry entry, State state, ImmutableOpenMap shards) { - this(entry.snapshotId, entry.includeGlobalState, state, entry.indices, entry.startTime, shards); + this(entry.snapshotId, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, shards); } public Entry(Entry entry, ImmutableOpenMap shards) { @@ -121,6 +123,10 @@ public boolean includeGlobalState() { return includeGlobalState; } + public boolean partial() { + return partial; + } + public long startTime() { return startTime; } @@ -133,6 +139,7 @@ public boolean equals(Object o) { Entry entry = (Entry) o; if (includeGlobalState != entry.includeGlobalState) return false; + if (partial != entry.partial) return false; if (startTime != entry.startTime) return false; if (!indices.equals(entry.indices)) return false; if (!shards.equals(entry.shards)) return false; @@ -148,6 +155,7 @@ public int hashCode() { int result = state.hashCode(); result = 31 * result + snapshotId.hashCode(); result = 31 * result + (includeGlobalState ? 1 : 0); + result = 31 * result + (partial ? 1 : 0); result = 31 * result + shards.hashCode(); result = 31 * result + indices.hashCode(); result = 31 * result + waitingIndices.hashCode(); @@ -360,6 +368,7 @@ public SnapshotsInProgress readFrom(StreamInput in) throws IOException { for (int i = 0; i < entries.length; i++) { SnapshotId snapshotId = SnapshotId.readSnapshotId(in); boolean includeGlobalState = in.readBoolean(); + boolean partial = in.readBoolean(); State state = State.fromValue(in.readByte()); int indices = in.readVInt(); List indexBuilder = new ArrayList<>(); @@ -375,7 +384,7 @@ public SnapshotsInProgress readFrom(StreamInput in) throws IOException { State shardState = State.fromValue(in.readByte()); builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState)); } - entries[i] = new Entry(snapshotId, includeGlobalState, state, Collections.unmodifiableList(indexBuilder), startTime, builder.build()); + entries[i] = new Entry(snapshotId, includeGlobalState, partial, state, Collections.unmodifiableList(indexBuilder), startTime, builder.build()); } return new SnapshotsInProgress(entries); } @@ -386,6 +395,7 @@ public void writeTo(StreamOutput out) throws IOException { for (Entry entry : entries) { entry.snapshotId().writeTo(out); out.writeBoolean(entry.includeGlobalState()); + out.writeBoolean(entry.partial()); out.writeByte(entry.state().value()); out.writeVInt(entry.indices().size()); for (String index : entry.indices()) { @@ -406,6 +416,7 @@ static final class Fields { static final XContentBuilderString SNAPSHOTS = new XContentBuilderString("snapshots"); static final XContentBuilderString SNAPSHOT = new XContentBuilderString("snapshot"); static final XContentBuilderString INCLUDE_GLOBAL_STATE = new XContentBuilderString("include_global_state"); + static final XContentBuilderString PARTIAL = new XContentBuilderString("partial"); static final XContentBuilderString STATE = new XContentBuilderString("state"); static final XContentBuilderString INDICES = new XContentBuilderString("indices"); static final XContentBuilderString START_TIME_MILLIS = new XContentBuilderString("start_time_millis"); @@ -431,6 +442,7 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p builder.field(Fields.REPOSITORY, entry.snapshotId().getRepository()); builder.field(Fields.SNAPSHOT, entry.snapshotId().getSnapshot()); builder.field(Fields.INCLUDE_GLOBAL_STATE, entry.includeGlobalState()); + builder.field(Fields.PARTIAL, entry.partial()); builder.field(Fields.STATE, entry.state()); builder.startArray(Fields.INDICES); { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index 54c014fb4edf3..132e46b1e94f4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -34,11 +34,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Arrays; -import java.util.Collection; +import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -67,7 +68,7 @@ public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, Clus } public void deleteIndices(final Request request, final Listener userListener) { - Collection indices = Arrays.asList(request.indices); + Set indices = Sets.newHashSet(request.indices); final DeleteIndexListener listener = new DeleteIndexListener(userListener); clusterService.submitStateUpdateTask("delete-index " + indices, new ClusterStateUpdateTask(Priority.URGENT) { @@ -84,6 +85,9 @@ public void onFailure(String source, Throwable t) { @Override public ClusterState execute(final ClusterState currentState) { + // Check if index deletion conflicts with any running snapshots + SnapshotsService.checkIndexDeletion(currentState, indices); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData()); ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks()); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 6639f9bdbd60a..121065bc638ea 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -19,14 +19,12 @@ package org.elasticsearch.cluster.metadata; -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -39,8 +37,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.SnapshotsService; import java.util.ArrayList; import java.util.Arrays; @@ -99,27 +98,10 @@ public ClusterState execute(ClusterState currentState) { return currentState; } - // Check if any of the indices to be closed are currently being restored from a snapshot and fail closing if such an index - // is found as closing an index that is being restored makes the index unusable (it cannot be recovered). - RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE); - if (restore != null) { - Set indicesToFail = null; - for (RestoreInProgress.Entry entry : restore.entries()) { - for (ObjectObjectCursor shard : entry.shards()) { - if (!shard.value.state().completed()) { - if (indicesToClose.contains(shard.key.getIndexName())) { - if (indicesToFail == null) { - indicesToFail = new HashSet<>(); - } - indicesToFail.add(shard.key.getIndexName()); - } - } - } - } - if (indicesToFail != null) { - throw new IllegalArgumentException("Cannot close indices that are being restored: " + indicesToFail); - } - } + // Check if index closing conflicts with any running restores + RestoreService.checkIndexClosing(currentState, indicesToClose); + // Check if index closing conflicts with any running snapshots + SnapshotsService.checkIndexClosing(currentState, indicesToClose); logger.info("closing indices [{}]", indicesAsString); diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index c6f189ea8a2de..65fb88d4b6454 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -774,6 +774,32 @@ private boolean failed(Snapshot snapshot, String index) { return false; } + /** + * Check if any of the indices to be closed are currently being restored from a snapshot and fail closing if such an index + * is found as closing an index that is being restored makes the index unusable (it cannot be recovered). + */ + public static void checkIndexClosing(ClusterState currentState, Set indices) { + RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE); + if (restore != null) { + Set indicesToFail = null; + for (RestoreInProgress.Entry entry : restore.entries()) { + for (ObjectObjectCursor shard : entry.shards()) { + if (!shard.value.state().completed()) { + if (indices.contains(shard.key.getIndexName())) { + if (indicesToFail == null) { + indicesToFail = new HashSet<>(); + } + indicesToFail.add(shard.key.getIndexName()); + } + } + } + } + if (indicesToFail != null) { + throw new IllegalArgumentException("Cannot close indices that are being restored: " + indicesToFail); + } + } + } + /** * Adds restore completion listener *

diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 909fb4a15695b..ae3fb9a39401f 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -206,7 +206,7 @@ public ClusterState execute(ClusterState currentState) { // Store newSnapshot here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndices(currentState, request.indicesOptions(), request.indices())); logger.trace("[{}][{}] creating snapshot for indices [{}]", request.repository(), request.name(), indices); - newSnapshot = new SnapshotsInProgress.Entry(snapshotId, request.includeGlobalState(), State.INIT, indices, System.currentTimeMillis(), null); + newSnapshot = new SnapshotsInProgress.Entry(snapshotId, request.includeGlobalState(), request.partial(), State.INIT, indices, System.currentTimeMillis(), null); snapshots = new SnapshotsInProgress(newSnapshot); } else { // TODO: What should we do if a snapshot is already running? @@ -228,7 +228,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new Runnable() { @Override public void run() { - beginSnapshot(newState, newSnapshot, request.partial, listener); + beginSnapshot(newState, newSnapshot, request.partial(), listener); } }); } @@ -1061,6 +1061,63 @@ private ImmutableOpenMap shard return builder.build(); } + /** + * Check if any of the indices to be deleted are currently being snapshotted. Fail as deleting an index that is being + * snapshotted (with partial == false) makes the snapshot fail. + */ + public static void checkIndexDeletion(ClusterState currentState, Set indices) { + Set indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices); + if (indicesToFail != null) { + throw new IllegalArgumentException("Cannot delete indices that are being snapshotted: " + indicesToFail + + ". Try again after snapshot finishes or cancel the currently running snapshot."); + } + } + + /** + * Check if any of the indices to be closed are currently being snapshotted. Fail as closing an index that is being + * snapshotted (with partial == false) makes the snapshot fail. + */ + public static void checkIndexClosing(ClusterState currentState, Set indices) { + Set indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices); + if (indicesToFail != null) { + throw new IllegalArgumentException("Cannot close indices that are being snapshotted: " + indicesToFail + + ". Try again after snapshot finishes or cancel the currently running snapshot."); + } + } + + private static Set indicesToFailForCloseOrDeletion(ClusterState currentState, Set indices) { + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + Set indicesToFail = null; + if (snapshots != null) { + for (final SnapshotsInProgress.Entry entry : snapshots.entries()) { + if (entry.partial() == false) { + if (entry.state() == State.INIT) { + for (String index : entry.indices()) { + if (indices.contains(index)) { + if (indicesToFail == null) { + indicesToFail = new HashSet<>(); + } + indicesToFail.add(index); + } + } + } else { + for (ObjectObjectCursor shard : entry.shards()) { + if (!shard.value.state().completed()) { + if (indices.contains(shard.key.getIndexName())) { + if (indicesToFail == null) { + indicesToFail = new HashSet<>(); + } + indicesToFail.add(shard.key.getIndexName()); + } + } + } + } + } + } + } + return indicesToFail; + } + /** * Adds snapshot completion listener * @@ -1302,6 +1359,15 @@ public boolean includeGlobalState() { return includeGlobalState; } + /** + * Returns true if partial snapshot should be allowed + * + * @return true if partial snapshot should be allowed + */ + public boolean partial() { + return partial; + } + /** * Returns master node timeout * diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index addb753ff4a52..c158fb1c3607a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -639,6 +639,7 @@ public ClusterState.Custom randomCreate(String name) { return new SnapshotsInProgress(new SnapshotsInProgress.Entry( new SnapshotId(randomName("repo"), randomName("snap")), randomBoolean(), + randomBoolean(), SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)), Collections.emptyList(), Math.abs(randomLong()), diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 9fb2b0f998974..a8a45e6a42f4e 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -1813,19 +1813,31 @@ public void testRecreateBlocksOnRestore() throws Exception { } } - public void testDeleteIndexDuringSnapshot() throws Exception { + public void testCloseOrDeleteIndexDuringSnapshot() throws Exception { Client client = client(); boolean allowPartial = randomBoolean(); - logger.info("--> creating repository"); - assertAcked(client.admin().cluster().preparePutRepository("test-repo") + + // only block on repo init if we have partial snapshot or we run into deadlock when acquiring shard locks for index deletion/closing + boolean initBlocking = allowPartial || randomBoolean(); + if (initBlocking) { + assertAcked(client.admin().cluster().preparePutRepository("test-repo") .setType("mock").setSettings(Settings.settingsBuilder() - .put("location", randomRepoPath()) - .put("compress", randomBoolean()) - .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) - .put("block_on_init", true) + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put("block_on_init", true) )); + } else { + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings(Settings.settingsBuilder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put("block_on_data", true) + )); + } createIndex("test-idx-1", "test-idx-2", "test-idx-3"); ensureGreen(); @@ -1843,25 +1855,61 @@ public void testDeleteIndexDuringSnapshot() throws Exception { logger.info("--> snapshot allow partial {}", allowPartial); ListenableActionFuture future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") - .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(allowPartial).execute(); + .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(allowPartial).execute(); logger.info("--> wait for block to kick in"); - waitForBlock(internalCluster().getMasterName(), "test-repo", TimeValue.timeValueMinutes(1)); - logger.info("--> delete some indices while snapshot is running"); - client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get(); - logger.info("--> unblock running master node"); - unblockNode(internalCluster().getMasterName()); + if (initBlocking) { + waitForBlock(internalCluster().getMasterName(), "test-repo", TimeValue.timeValueMinutes(1)); + } else { + waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); + } + if (allowPartial) { + // partial snapshots allow close / delete operations + if (randomBoolean()) { + logger.info("--> delete index while partial snapshot is running"); + client.admin().indices().prepareDelete("test-idx-1").get(); + } else { + logger.info("--> close index while partial snapshot is running"); + client.admin().indices().prepareClose("test-idx-1").get(); + } + } else { + // non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed + if (randomBoolean()) { + try { + logger.info("--> delete index while non-partial snapshot is running"); + client.admin().indices().prepareDelete("test-idx-1").get(); + fail("Expected deleting index to fail during snapshot"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("Cannot delete indices that are being snapshotted: [test-idx-1]")); + } + } else { + try { + logger.info("--> close index while non-partial snapshot is running"); + client.admin().indices().prepareClose("test-idx-1").get(); + fail("Expected closing index to fail during snapshot"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("Cannot close indices that are being snapshotted: [test-idx-1]")); + } + } + } + if (initBlocking) { + logger.info("--> unblock running master node"); + unblockNode(internalCluster().getMasterName()); + } else { + logger.info("--> unblock all data nodes"); + unblockAllDataNodes("test-repo"); + } logger.info("--> waiting for snapshot to finish"); CreateSnapshotResponse createSnapshotResponse = future.get(); if (allowPartial) { - logger.info("Deleted index during snapshot, but allow partial"); + logger.info("Deleted/Closed index during snapshot, but allow partial"); assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.PARTIAL))); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); assertThat(createSnapshotResponse.getSnapshotInfo().failedShards(), greaterThan(0)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), lessThan(createSnapshotResponse.getSnapshotInfo().totalShards())); } else { - logger.info("Deleted index during snapshot and doesn't allow partial"); - assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.FAILED))); + logger.info("Snapshot successfully completed"); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS))); } } @@ -1960,7 +2008,7 @@ public ClusterState execute(ClusterState currentState) { shards.put(new ShardId("test-idx", "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED)); shards.put(new ShardId("test-idx", "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED)); List entries = new ArrayList<>(); - entries.add(new Entry(new SnapshotId("test-repo", "test-snap"), true, State.ABORTED, Collections.singletonList("test-idx"), System.currentTimeMillis(), shards.build())); + entries.add(new Entry(new SnapshotId("test-repo", "test-snap"), true, false, State.ABORTED, Collections.singletonList("test-idx"), System.currentTimeMillis(), shards.build())); return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))).build(); } diff --git a/docs/reference/migration/migrate_5_0.asciidoc b/docs/reference/migration/migrate_5_0.asciidoc index 6fcf566fdcb37..898b79ad487f6 100644 --- a/docs/reference/migration/migrate_5_0.asciidoc +++ b/docs/reference/migration/migrate_5_0.asciidoc @@ -21,6 +21,7 @@ your application to Elasticsearch 5.0. * <> * <> * <> +* <> [[breaking_50_search_changes]] === Warmers @@ -844,3 +845,12 @@ distributed document frequencies anymore. The option to disable the security manager `--security.manager.enabled` has been removed. In order to grant special permissions to elasticsearch users must tweak the local Java Security Policy. + +[[breaking_50_snapshot_restore]] +=== Snapshot/Restore + +==== Closing / deleting indices while running snapshot + +In previous versions of Elasticsearch, closing or deleting an index during a full snapshot would make the snapshot fail. This is now changed +by failing the close/delete index request instead. The behavior for partial snapshots remains unchanged: Closing or deleting an index during +a partial snapshot is still possible. The snapshot result is then marked as partial.