Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Close Index API] Add unique UUID to ClusterBlock #36775

Merged
merged 17 commits into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.block;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand All @@ -36,7 +37,7 @@
public class ClusterBlock implements Streamable, ToXContentFragment {

private int id;
private String uuid;
private @Nullable String uuid;
private String description;
private EnumSet<ClusterBlockLevel> levels;
private boolean retryable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -159,6 +160,19 @@ public boolean hasIndexBlockWithId(String index, int blockId) {
return false;
}

@Nullable
public ClusterBlock getIndexBlockWithId(final String index, final int blockId) {
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
if (clusterBlocks != null) {
for (ClusterBlock clusterBlock : clusterBlocks) {
if (clusterBlock.id() == blockId) {
return clusterBlock;
}
}
}
return null;
}

public void globalBlockedRaiseException(ClusterBlockLevel level) throws ClusterBlockException {
ClusterBlockException blockException = globalBlockedException(level);
if (blockException != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
Expand All @@ -68,6 +69,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -147,9 +149,30 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta
.execute(new WaitForClosedBlocksApplied(blockedIndices, timeout,
ActionListener.wrap(results ->
clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) {

boolean acknowledged = true;

@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
final ClusterState updatedState = closeRoutingTable(currentState, blockedIndices, results);
// Combine the results of the verify shards before close actions with the cluster state changes
// to determine if the current close action effectively closed all indices.
for (Map.Entry<Index, AcknowledgedResponse> result : results.entrySet()) {
IndexMetaData updatedMetaData = updatedState.metaData().index(result.getKey());
if (updatedMetaData != null) {
if (result.getValue().isAcknowledged()) {
if (updatedMetaData.getState() == IndexMetaData.State.CLOSE) {
IndexMetaData previousMetaData = currentState.metaData().index(result.getKey());
if (previousMetaData != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how could this ever be null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't, I tend to be too defensive on NPEs.

acknowledged = (previousMetaData.getState() == IndexMetaData.State.OPEN);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow this logic here. Wouldn't it be much simpler just to say. If the index is closed at this point, all is good. If not, something must have gone wrong, thereby acked == false.

This amounts to the following simple logic:

for (Map.Entry<Index, AcknowledgedResponse> result : results.entrySet()) {
    IndexMetaData updatedMetaData = updatedState.metaData().index(result.getKey());
    if (updatedMetaData != null && updatedMetaData.getState() != IndexMetaData.State.CLOSE) {
        acknowledged = false;
        break;
    }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, let's do that.

}
}
} else {
acknowledged = false;
break;
}
}
}
return allocationService.reroute(updatedState, "indices closed");
}

Expand All @@ -161,7 +184,6 @@ public void onFailure(final String source, final Exception e) {
@Override
public void clusterStateProcessed(final String source,
final ClusterState oldState, final ClusterState newState) {
boolean acknowledged = results.values().stream().allMatch(AcknowledgedResponse::isAcknowledged);
listener.onResponse(new AcknowledgedResponse(acknowledged));
}
}),
Expand Down Expand Up @@ -242,10 +264,10 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final Map<Index,
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
routingTable.remove(index.getName());
indexBlock = INDEX_CLOSED_BLOCK;
}
if (indexBlock == null) {
} else if (indexBlock == null) {
// Create a new index closed block
indexBlock = createIndexClosedBlock();
indexBlock = createIndexClosingBlock();
assert Strings.hasLength(indexBlock.uuid()) : "Closing block should have a UUID";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assertion should also cover the case where indexBlock was not previously null, i.e., rewrite this as:

} else {
  if (indexBlock == null) {
    indexBlock = createIndexClosingBlock();
  }
  assert Strings.hasLength(indexBlock.uuid()) : "Closing block should have a UUID";
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

}
blocks.addIndexBlock(index.getName(), indexBlock);
blockedIndices.put(index, indexBlock);
Expand Down Expand Up @@ -384,27 +406,26 @@ static ClusterState closeRoutingTable(final ClusterState currentState,
final boolean acknowledged = result.getValue().isAcknowledged();
try {
if (acknowledged == false) {
logger.debug("closing index {} failed", index);
logger.debug("verification of shards before closing {} failed", index);
continue;
}
final IndexMetaData indexMetaData = metadata.getSafe(index);
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
logger.debug("closing index {} succeed but index is already closed", index);
logger.debug("verification of shards before closing {} succeeded but index is already closed", index);
assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
continue;
}
final ClusterBlock closingBlock = blockedIndices.get(index);
if (currentState.blocks().hasIndexBlock(index.getName(), closingBlock) == false) {
logger.debug("closing index {} succeed but block has been removed in the mean time", index);
logger.debug("verification of shards before closing {} succeeded but block has been removed in the meantime", index);
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why has it succeeded here? The index is not closed and the block has disappeared, so it must have been reopened in the meanwhile. This should be counted as a failure, not a success.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the message in the log is confusing. What succeeded here is the verification of shards using the block that has now disappeared. I changed to change the log message.

This should be counted as a failure, not a success.

I agree. I pushed a change that combines the results of the TransportVerifyShardBeforeCloseAction and the cluster state changes in order to determine if the current close action has indeed closed the index.

}

logger.debug("closing index {} succeed", index);
logger.debug("closing index {} succeeded", index);
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID).addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE));
routingTable.remove(index.getName());
closedIndices.add(index.getName());

} catch (final IndexNotFoundException e) {
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
}
Expand Down Expand Up @@ -540,12 +561,10 @@ private static int getTotalShardCount(ClusterState state, Index index) {
* @return Generates a {@link ClusterBlock} that blocks read and write operations on soon-to-be-closed indices. The
* cluster block is generated with the id value equals to {@link #INDEX_CLOSED_BLOCK_ID} and a unique UUID.
*/
public static ClusterBlock createIndexClosedBlock() {
final String description = "Index is blocked due to on-going index closing operation. Note that the closing process can take " +
"time and writes operations are blocked in the meantime. Execute an open index request to unblock the index and allow writes " +
"operation again. Execute a new close index request will try to close the index again.";
return new ClusterBlock(INDEX_CLOSED_BLOCK_ID, UUIDs.randomBase64UUID(), description , false, false, false,
RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);
public static ClusterBlock createIndexClosingBlock() {
return new ClusterBlock(INDEX_CLOSED_BLOCK_ID, UUIDs.randomBase64UUID(), "index preparing to close. Reopen the index to allow " +
"writes again or retry closing the index to fully close the index.", false, false, false, RestStatus.FORBIDDEN,
EnumSet.of(ClusterBlockLevel.WRITE));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void setUp() throws Exception {

clusterService = createClusterService(threadPool);

clusterBlock = MetaDataIndexStateService.createIndexClosedBlock();
clusterBlock = MetaDataIndexStateService.createIndexClosingBlock();
setState(clusterService, new ClusterState.Builder(clusterService.state())
.blocks(ClusterBlocks.builder().blocks(clusterService.state().blocks()).addIndexBlock("index", clusterBlock).build()).build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;

import static java.util.EnumSet.copyOf;
import static org.elasticsearch.test.VersionUtils.getPreviousVersion;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.nullValue;

public class ClusterBlockTests extends ESTestCase {
Expand Down Expand Up @@ -116,16 +117,16 @@ public void testGlobalBlocksCheckedIfNoIndicesSpecified() {
public void testRemoveIndexBlockWithId() {
final ClusterBlocks.Builder builder = ClusterBlocks.builder();
builder.addIndexBlock("index-1",
new ClusterBlock(1, "uuid", "", true, true, true, RestStatus.OK, EnumSet.copyOf(ClusterBlockLevel.ALL)));
new ClusterBlock(1, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
builder.addIndexBlock("index-1",
new ClusterBlock(2, "uuid", "", true, true, true, RestStatus.OK, EnumSet.copyOf(ClusterBlockLevel.ALL)));
new ClusterBlock(2, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
builder.addIndexBlock("index-1",
new ClusterBlock(3, "uuid", "", true, true, true, RestStatus.OK, EnumSet.copyOf(ClusterBlockLevel.ALL)));
new ClusterBlock(3, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
builder.addIndexBlock("index-1",
new ClusterBlock(3, "other uuid", "", true, true, true, RestStatus.OK, EnumSet.copyOf(ClusterBlockLevel.ALL)));
new ClusterBlock(3, "other uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));

builder.addIndexBlock("index-2",
new ClusterBlock(3, "uuid3", "", true, true, true, RestStatus.OK, EnumSet.copyOf(ClusterBlockLevel.ALL)));
new ClusterBlock(3, "uuid3", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));

ClusterBlocks clusterBlocks = builder.build();
assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(4));
Expand All @@ -150,6 +151,21 @@ public void testRemoveIndexBlockWithId() {
assertThat(clusterBlocks.hasIndexBlockWithId("index-2", 3), is(false));
}

public void testGetIndexBlockWithId() {
final int blockId = randomInt();
final ClusterBlock[] clusterBlocks = new ClusterBlock[randomIntBetween(1, 5)];

final ClusterBlocks.Builder builder = ClusterBlocks.builder();
for (int i = 0; i < clusterBlocks.length; i++) {
clusterBlocks[i] = new ClusterBlock(blockId, "uuid" + i, "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL));
builder.addIndexBlock("index", clusterBlocks[i]);
}

assertThat(builder.build().indices().get("index").size(), equalTo(clusterBlocks.length));
assertThat(builder.build().getIndexBlockWithId("index", blockId), isOneOf(clusterBlocks));
assertThat(builder.build().getIndexBlockWithId("index", randomValueOtherThan(blockId, ESTestCase::randomInt)), nullValue());
}

private ClusterBlock randomClusterBlock() {
return randomClusterBlock(randomVersion(random()));
}
Expand All @@ -158,7 +174,7 @@ private ClusterBlock randomClusterBlock(final Version version) {
final String uuid = (version.onOrAfter(Version.V_7_0_0) && randomBoolean()) ? UUIDs.randomBase64UUID() : null;
final List<ClusterBlockLevel> levels = Arrays.asList(ClusterBlockLevel.values());
return new ClusterBlock(randomInt(), uuid, "cluster block #" + randomInt(), randomBoolean(), randomBoolean(), randomBoolean(),
randomFrom(RestStatus.values()), EnumSet.copyOf(randomSubsetOf(randomIntBetween(1, levels.size()), levels)));
randomFrom(RestStatus.values()), copyOf(randomSubsetOf(randomIntBetween(1, levels.size()), levels)));
}

private void assertClusterBlockEquals(final ClusterBlock expected, final ClusterBlock actual) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testCloseRoutingTable() {
state = addOpenedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state);
nonBlockedIndices.add(state.metaData().index(indexName).getIndex());
} else {
final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosedBlock();
final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosingBlock();
state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock);
blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock);
results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void testConcurrentClose() throws InterruptedException {
throw new AssertionError(e);
}
try {
assertAcked(client().admin().indices().prepareClose(indexName));
client().admin().indices().prepareClose(indexName).get();
} catch (final Exception e) {
assertException(e, indexName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testReopenDuringClose() throws Exception {
final String indexName = "test";
createIndexWithDocs(indexName);

ensureClusterSizeConsistency();
ensureYellowAndNoInitializingShards(indexName);

final CountDownLatch block = new CountDownLatch(1);
final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(indexName, block::countDown);
Expand All @@ -75,7 +75,7 @@ public void testReopenDuringClose() throws Exception {
assertAcked(client().admin().indices().prepareOpen(indexName));

releaseBlock.close();
closeIndexResponse.get();
assertFalse(closeIndexResponse.get().isAcknowledged());
assertIndexIsOpened(indexName);
}

Expand All @@ -86,7 +86,7 @@ public void testReopenDuringCloseOnMultipleIndices() throws Exception {
createIndexWithDocs(indices.get(i));
}

ensureClusterSizeConsistency();
ensureYellowAndNoInitializingShards(indices.toArray(Strings.EMPTY_ARRAY));

final CountDownLatch block = new CountDownLatch(1);
final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(randomFrom(indices), block::countDown);
Expand All @@ -100,7 +100,7 @@ public void testReopenDuringCloseOnMultipleIndices() throws Exception {
assertAcked(client().admin().indices().prepareOpen(reopenedIndices.toArray(Strings.EMPTY_ARRAY)));

releaseBlock.close();
closeIndexResponse.get();
assertFalse(closeIndexResponse.get().isAcknowledged());

indices.forEach(index -> {
if (reopenedIndices.contains(index)) {
Expand Down