Skip to content

Commit

Permalink
Add validation for supported index version on node join, restore, upg…
Browse files Browse the repository at this point in the history
…rade & open index (elastic#21830)

Today we can easily join a cluster that holds an index we don't support since
we currently allow rolling upgrades from 5.x to 6.x. Along the same lines we don't check if we can support an index based on the nodes in the cluster when we open, restore or metadata-upgrade and index. This commit adds
additional safety that fails cluster state validation, open, restore and /or upgrade if there is an open index with an incompatible index version created in the cluster.

Realtes to elastic#21670
  • Loading branch information
s1monw committed Dec 1, 2016
1 parent 38fc958 commit 91d5a95
Show file tree
Hide file tree
Showing 29 changed files with 309 additions and 56 deletions.
36 changes: 32 additions & 4 deletions core/src/main/java/org/elasticsearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,26 @@ public static void writeVersion(Version version, StreamOutput out) throws IOExce
}

/**
* Returns the smallest version between the 2.
* Returns the minimum version between the 2.
*/
public static Version smallest(Version version1, Version version2) {
public static Version min(Version version1, Version version2) {
return version1.id < version2.id ? version1 : version2;
}

/**
* Returns the minimum version between the 2.
* @deprecated use {@link #min(Version, Version)} instead
*/
@Deprecated
public static Version smallest(Version version1, Version version2) {
return min(version1, version2);
}

/**
* Returns the maximum version between the 2
*/
public static Version max(Version version1, Version version2) { return version1.id > version2.id ? version1 : version2; }

/**
* Returns the version given its string representation, current version if the argument is null or empty
*/
Expand Down Expand Up @@ -312,7 +326,22 @@ public boolean onOrBefore(Version version) {
* is a beta or RC release then the version itself is returned.
*/
public Version minimumCompatibilityVersion() {
return Version.smallest(this, fromId(major * 1000000 + 99));
return Version.min(this, fromId(major * 1000000 + 99));
}

/**
* Returns the minimum created index version that this version supports. Indices created with lower versions
* can't be used with this version.
*/
public Version minimumIndexCompatibilityVersion() {
final int bwcMajor;
if (major == 5) {
bwcMajor = 2; // we jumped from 2 to 5
} else {
bwcMajor = major - 1;
}
final int bwcMinor = 0;
return Version.min(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
}

@SuppressForbidden(reason = "System.out.*")
Expand Down Expand Up @@ -389,5 +418,4 @@ public boolean isRC() {
public boolean isRelease() {
return build == 99;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {

if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) {
DiscoveryNodes nodes = currentState.nodes();
final Version createdVersion = Version.smallest(Version.CURRENT, nodes.getSmallestNonClientNodeVersion());
final Version createdVersion = Version.min(Version.CURRENT, nodes.getSmallestNonClientNodeVersion());
indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
Expand Down Expand Up @@ -160,12 +161,14 @@ public ClusterState execute(ClusterState currentState) {
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
.blocks(currentState.blocks());
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
.minimumIndexCompatibilityVersion();
for (IndexMetaData closedMetaData : indicesToOpen) {
final String indexName = closedMetaData.getIndex().getName();
IndexMetaData indexMetaData = IndexMetaData.builder(closedMetaData).state(IndexMetaData.State.OPEN).build();
// The index might be closed because we couldn't import it due to old incompatible version
// We need to check that this index can be upgraded to the current version
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData, minIndexCompatibilityVersion);
try {
indicesService.verifyIndexMetadata(indexMetaData, indexMetaData);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ public MetaDataIndexUpgradeService(Settings settings, MapperRegistry mapperRegis
* If the index does not need upgrade it returns the index metadata unchanged, otherwise it returns a modified index metadata. If index
* cannot be updated the method throws an exception.
*/
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) {
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
// Throws an exception if there are too-old segments:
if (isUpgraded(indexMetaData)) {
assert indexMetaData == archiveBrokenIndexSettings(indexMetaData) : "all settings must have been upgraded before";
return indexMetaData;
}
checkSupportedVersion(indexMetaData);
checkSupportedVersion(indexMetaData, minimumIndexCompatibilityVersion);
IndexMetaData newMetaData = indexMetaData;
// we have to run this first otherwise in we try to create IndexSettings
// with broken settings and fail in checkMappingsCompatibility
Expand All @@ -92,21 +92,26 @@ boolean isUpgraded(IndexMetaData indexMetaData) {
}

/**
* Elasticsearch 5.0 no longer supports indices with pre Lucene v5.0 (Elasticsearch v2.0.0.beta1) segments. All indices
* that were created before Elasticsearch v2.0.0.beta1 should be reindexed in Elasticsearch 2.x
* before they can be opened by this version of elasticsearch. */
private void checkSupportedVersion(IndexMetaData indexMetaData) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData) == false) {
throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created before v2.0.0.beta1."
+ " It should be reindexed in Elasticsearch 2.x before upgrading to " + Version.CURRENT + ".");
* Elasticsearch v6.0 no longer supports indices created pre v5.0. All indices
* that were created before Elasticsearch v5.0 should be re-indexed in Elasticsearch 5.x
* before they can be opened by this version of elasticsearch.
*/
private void checkSupportedVersion(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData,
minimumIndexCompatibilityVersion) == false) {
throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created with version ["
+ indexMetaData.getCreationVersion() + "] but the minimum compatible version is ["

+ minimumIndexCompatibilityVersion + "]. It should be re-indexed in Elasticsearch " + minimumIndexCompatibilityVersion.major
+ ".x before upgrading to " + Version.CURRENT + ".");
}
}

/*
* Returns true if this index can be supported by the current version of elasticsearch
*/
private static boolean isSupportedVersion(IndexMetaData indexMetaData) {
return indexMetaData.getCreationVersion().onOrAfter(Version.V_2_0_0_beta1);
private static boolean isSupportedVersion(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
return indexMetaData.getCreationVersion().onOrAfter(minimumIndexCompatibilityVersion);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,22 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
private final String masterNodeId;
private final String localNodeId;
private final Version minNonClientNodeVersion;
private final Version maxNodeVersion;
private final Version minNodeVersion;

private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes,
ImmutableOpenMap<String, DiscoveryNode> masterNodes, ImmutableOpenMap<String, DiscoveryNode> ingestNodes,
String masterNodeId, String localNodeId, Version minNonClientNodeVersion) {
String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNodeVersion,
Version minNodeVersion) {
this.nodes = nodes;
this.dataNodes = dataNodes;
this.masterNodes = masterNodes;
this.ingestNodes = ingestNodes;
this.masterNodeId = masterNodeId;
this.localNodeId = localNodeId;
this.minNonClientNodeVersion = minNonClientNodeVersion;
this.minNodeVersion = minNodeVersion;
this.maxNodeVersion = maxNodeVersion;
}

@Override
Expand Down Expand Up @@ -236,6 +241,24 @@ public Version getSmallestNonClientNodeVersion() {
return minNonClientNodeVersion;
}

/**
* Returns the version of the node with the oldest version in the cluster.
*
* @return the oldest version in the cluster
*/
public Version getMinNodeVersion() {
return minNodeVersion;
}

/**
* Returns the version of the node with the yougest version in the cluster
*
* @return the oldest version in the cluster
*/
public Version getMaxNodeVersion() {
return maxNodeVersion;
}

/**
* Resolve a node with a given id
*
Expand Down Expand Up @@ -642,25 +665,27 @@ public DiscoveryNodes build() {
ImmutableOpenMap.Builder<String, DiscoveryNode> masterNodesBuilder = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder();
Version minNodeVersion = Version.CURRENT;
Version maxNodeVersion = Version.CURRENT;
Version minNonClientNodeVersion = Version.CURRENT;
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
if (nodeEntry.value.isDataNode()) {
dataNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.getVersion());
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
}
if (nodeEntry.value.isMasterNode()) {
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.getVersion());
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
}
if (nodeEntry.value.isIngestNode()) {
ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value);
}
minNodeVersion = Version.smallest(minNodeVersion, nodeEntry.value.getVersion());
minNodeVersion = Version.min(minNodeVersion, nodeEntry.value.getVersion());
maxNodeVersion = Version.max(maxNodeVersion, nodeEntry.value.getVersion());
}

return new DiscoveryNodes(
nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(),
masterNodeId, localNodeId, minNonClientNodeVersion
masterNodeId, localNodeId, minNonClientNodeVersion, maxNodeVersion, minNodeVersion
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@

package org.elasticsearch.discovery.zen;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
Expand All @@ -37,6 +39,7 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
*
Expand All @@ -61,21 +64,20 @@ public interface MembershipListener {

private final TransportService transportService;

private final DiscoveryNodesProvider nodesProvider;

private final MembershipListener listener;

public MembershipAction(Settings settings, TransportService transportService,
DiscoveryNodesProvider nodesProvider, MembershipListener listener) {
Supplier<DiscoveryNode> localNodeSupplier, MembershipListener listener) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener;


transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new,
ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new ValidateJoinRequest(localNodeSupplier), ThreadPool.Names.GENERIC,
new ValidateJoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
}
Expand Down Expand Up @@ -155,20 +157,23 @@ public void onFailure(Exception e) {
}
}

class ValidateJoinRequest extends TransportRequest {
static class ValidateJoinRequest extends TransportRequest {
private final Supplier<DiscoveryNode> localNode;
private ClusterState state;

ValidateJoinRequest() {
ValidateJoinRequest(Supplier<DiscoveryNode> localNode) {
this.localNode = localNode;
}

ValidateJoinRequest(ClusterState state) {
this.state = state;
this.localNode = state.nodes()::getLocalNode;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.state = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
this.state = ClusterState.Builder.readFrom(in, localNode.get());
}

@Override
Expand All @@ -178,15 +183,31 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {

@Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
ensureIndexCompatibility(Version.CURRENT.minimumIndexCompatibilityVersion(), request.state.getMetaData());
// for now, the mere fact that we can serialize the cluster state acts as validation....
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

/**
* Ensures that all indices are compatible with the supported index version.
* @throws IllegalStateException if any index is incompatible with the given version
*/
static void ensureIndexCompatibility(final Version supportedIndexVersion, MetaData metaData) {
// we ensure that all indices in the cluster we join are compatible with us no matter if they are
// closed or not we can't read mappings of these indices so we need to reject the join...
for (IndexMetaData idxMetaData : metaData) {
if (idxMetaData.getCreationVersion().before(supportedIndexVersion)) {
throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: "
+ idxMetaData.getCreationVersion() + " minimum compatible index version is: " + supportedIndexVersion);
}
}
}

public static class LeaveRequest extends TransportRequest {

private DiscoveryNode node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,6 @@ class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {
@Override
public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
final BatchResult.Builder<DiscoveryNode> results = BatchResult.builder();

final DiscoveryNodes currentNodes = currentState.nodes();
boolean nodesChanged = false;
ClusterState.Builder newState;
Expand All @@ -434,8 +433,10 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov

assert nodesBuilder.isLocalNodeElectedMaster();

Version minNodeVersion = Version.CURRENT;
// processing any joins
for (final DiscoveryNode node : joiningNodes) {
minNodeVersion = Version.min(minNodeVersion, node.getVersion());
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {
// noop
} else if (currentNodes.nodeExists(node)) {
Expand All @@ -451,7 +452,9 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
}
results.success(node);
}

// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
MembershipAction.ensureIndexCompatibility(minNodeVersion.minimumIndexCompatibilityVersion(), currentState.getMetaData());
if (nodesChanged) {
newState.nodes(nodesBuilder);
return results.build(allocationService.reroute(newState.build(), "node_join"));
Expand Down
Loading

0 comments on commit 91d5a95

Please sign in to comment.