From 155a363815876d5cc5d277082c06d6053d5b20b3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 27 Feb 2019 17:05:50 -0500 Subject: [PATCH 1/6] Add BWC for retention leases We have to handle the case of a < 6.7.0 and >= 6.7.0 mixed cluster wherein a primary on >= 6.7.0 would otherwise send retention leases to a < 6.7.0 node which would not understand them. This commit adds BWC for this case, and adds a test to ensure that we behave properly here. --- .../gradle/test/ClusterFormationTasks.groovy | 4 +- qa/retention-lease-bwc/build.gradle | 63 +++++ .../AddRetentionLeasePlugin.java | 51 ++++ .../RestAddRetentionLeaseHandler.java | 103 ++++++++ .../RetentionLeaseBwcIT.java | 245 ++++++++++++++++++ .../RetentionLeaseBackgroundSyncAction.java | 15 ++ .../index/seqno/RetentionLeaseSyncAction.java | 15 ++ 7 files changed, 495 insertions(+), 1 deletion(-) create mode 100644 qa/retention-lease-bwc/build.gradle create mode 100644 qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/AddRetentionLeasePlugin.java create mode 100644 qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/RestAddRetentionLeaseHandler.java create mode 100644 qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy index 83ff5f217e32b..bd0c9d3df4aed 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy @@ -130,7 +130,9 @@ class ClusterFormationTasks { if (esConfig.containsKey('discovery.zen.hosts_provider') == false) { esConfig['discovery.zen.hosts_provider'] = 'file' } - esConfig['discovery.zen.ping.unicast.hosts'] = [] + if (esConfig.containsKey('discovery.zen.ping.unicast.hosts') == false) { + esConfig['discovery.zen.ping.unicast.hosts'] = [] + } esConfig } dependsOn = startDependencies diff --git a/qa/retention-lease-bwc/build.gradle b/qa/retention-lease-bwc/build.gradle new file mode 100644 index 0000000000000..bb7abbf5ff18a --- /dev/null +++ b/qa/retention-lease-bwc/build.gradle @@ -0,0 +1,63 @@ +import org.elasticsearch.gradle.Version +import org.elasticsearch.gradle.test.RestIntegTestTask + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +apply plugin: 'elasticsearch.esplugin' + +esplugin { + description 'add retention lease plugin' + classname 'org.elasticsearch.retention_lease_bwc.AddRetentionLeasePlugin' +} + +integTest.enabled = false + +task oldClusterTest(type: RestIntegTestTask) { + mustRunAfter(precommit) +} + +oldClusterTestCluster { + numNodes = 2 + numBwcNodes = 2 + bwcVersion = Version.fromString("6.6.2-SNAPSHOT") + setting "cluster.name", "retention-lease-bwc" +} + +task newClusterTest(type: RestIntegTestTask) { + +} + +newClusterTestCluster { + dependsOn "oldClusterTestCluster#wait" + numNodes = 1 + plugin ":qa:retention-lease-bwc" + setting "discovery.zen.ping.unicast.hosts", "\"${-> oldClusterTest.nodes.get(0).transportUri()}\"" + setting "cluster.name", "retention-lease-bwc" + setting "node.name", "new-node" +} + +newClusterTestRunner { + finalizedBy "oldClusterTestCluster#node0.stop" + finalizedBy "oldClusterTestCluster#node1.stop" + finalizedBy "newClusterTestCluster#stop" +} + +check.dependsOn newClusterTest +unitTest.enabled = false diff --git a/qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/AddRetentionLeasePlugin.java b/qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/AddRetentionLeasePlugin.java new file mode 100644 index 0000000000000..f1cf8c5a4ec34 --- /dev/null +++ b/qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/AddRetentionLeasePlugin.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.retention_lease_bwc; + +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; + +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +public class AddRetentionLeasePlugin extends Plugin implements ActionPlugin { + + @Override + public List getRestHandlers( + final Settings settings, + final RestController restController, + final ClusterSettings clusterSettings, + final IndexScopedSettings indexScopedSettings, + final SettingsFilter settingsFilter, + final IndexNameExpressionResolver indexNameExpressionResolver, + final Supplier nodesInCluster) { + return Collections.singletonList(new RestAddRetentionLeaseHandler(settings, restController)); + } + +} diff --git a/qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/RestAddRetentionLeaseHandler.java b/qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/RestAddRetentionLeaseHandler.java new file mode 100644 index 0000000000000..cc0582cdf65db --- /dev/null +++ b/qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/RestAddRetentionLeaseHandler.java @@ -0,0 +1,103 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.retention_lease_bwc; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestActionListener; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Collections; + +public class RestAddRetentionLeaseHandler extends BaseRestHandler { + + public RestAddRetentionLeaseHandler(final Settings settings, final RestController restController) { + super(settings); + restController.registerHandler(RestRequest.Method.PUT, "/{index}/_add_retention_lease", this); + } + + @Override + public String getName() { + return "add_retention_lease"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + final String index = request.param("index"); + final String id = request.param("id"); + final long retainingSequenceNumber = Long.parseLong(request.param("retaining_sequence_number")); + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(index); + return channel -> + client.admin().cluster().state(clusterStateRequest, new ActionListener() { + @Override + public void onResponse(final ClusterStateResponse clusterStateResponse) { + final IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().index(index); + final int numberOfShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexMetaData.getSettings()); + + final GroupedActionListener listener = new GroupedActionListener<>( + new RestActionListener>(channel) { + + @Override + protected void processResponse( + final Collection responses) throws Exception { + final XContentBuilder builder = channel.newBuilder().startObject().endObject(); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + } + + }, numberOfShards, Collections.emptyList()); + for (int i = 0; i < numberOfShards; i++) { + final ShardId shardId = new ShardId(indexMetaData.getIndex(), i); + client.execute( + RetentionLeaseActions.Add.INSTANCE, + new RetentionLeaseActions.AddRequest(shardId, id, retainingSequenceNumber, "rest"), + listener); + } + } + + @Override + public void onFailure(final Exception e) { + try { + channel.sendResponse(new BytesRestResponse(channel, RestStatus.SERVICE_UNAVAILABLE, e)); + } catch (IOException inner) { + inner.addSuppressed(e); + throw new UncheckedIOException(inner); + } + } + }); + } +} diff --git a/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java b/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java new file mode 100644 index 0000000000000..7f9897e9a9f07 --- /dev/null +++ b/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java @@ -0,0 +1,245 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.add_retention_lease; + +import org.apache.http.HttpHost; +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class RetentionLeaseBwcIT extends ESRestTestCase { + + @SuppressForbidden(reason = "debug") + public void testRetentionLeaseBwcIT() throws IOException { + // we have to dance like this otherwise we can not end up with a primary on the new node with a replica on the old node + final Response getNodesResponse = client().performRequest(new Request("GET", "/_nodes")); + final ObjectPath getNodesObjectPath = ObjectPath.createFromResponse(getNodesResponse); + final Map nodesAsMap = getNodesObjectPath.evaluate("nodes"); + final List nodes = new ArrayList<>(); + for (final String id : nodesAsMap.keySet()) { + nodes.add(new Node( + id, + getNodesObjectPath.evaluate("nodes." + id + ".name"), + Version.fromString(getNodesObjectPath.evaluate("nodes." + id + ".version")), + HttpHost.create(getNodesObjectPath.evaluate("nodes." + id + ".http.publish_address")))); + } + final List oldNodes = + nodes.stream().filter(node -> node.version().before(Version.V_6_7_0)).collect(Collectors.toList()); + final Node newNode = + nodes.stream().filter(node -> node.version().onOrAfter(Version.V_6_7_0)).findFirst().get(); + // only allow shards on the old nodes + final Settings settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put( + "index.routing.allocation.include._name", + oldNodes.stream().map(Node::nodeName).collect(Collectors.joining(","))) + .build(); + final Request createIndexRequest = new Request("PUT", "/index"); + createIndexRequest.setJsonEntity(Strings.toString(settings)); + client().performRequest(createIndexRequest); + ensureYellow("index"); + // allow shards on all nodes + final Request removeRoutingAllocationIncludeNameRequest = new Request("PUT", "/index/_settings"); + final Settings removeRoutingAllocationIncludeNameSettings = + Settings.builder().putNull("index.routing.allocation.include._name").build(); + removeRoutingAllocationIncludeNameRequest.setJsonEntity(Strings.toString(removeRoutingAllocationIncludeNameSettings)); + client().performRequest(removeRoutingAllocationIncludeNameRequest); + ensureGreen("index"); + // move the primary to the new node by excluding it on the current old node + final List shards = getShards("index", nodes, client()); + final Request addRoutingAllocationExcludeNameRequest = new Request("PUT", "/index/_settings"); + final Settings addRoutingAllocationExcludeNameSettings = Settings.builder() + .put("index.routing.allocation.exclude._name", shards.stream().filter(Shard::primary).findFirst().get().node().nodeName()) + .build(); + addRoutingAllocationExcludeNameRequest.setJsonEntity(Strings.toString(addRoutingAllocationExcludeNameSettings)); + client().performRequest(addRoutingAllocationExcludeNameRequest); + ensureGreen("index"); + try (RestClient newClient = buildClient(Settings.EMPTY, new HttpHost[]{newNode.publishAddress()})) { + final Request addRetentionLeaseRequest = new Request("PUT", "/index/_add_retention_lease"); + addRetentionLeaseRequest.addParameter("id", "test-1"); + addRetentionLeaseRequest.addParameter("retaining_sequence_number", "-1"); + newClient.performRequest(addRetentionLeaseRequest); + + final Request statsRequest = new Request("GET", "/index/_stats"); + statsRequest.addParameter("level", "shards"); + final Response statsResponse = newClient.performRequest(statsRequest); + final ObjectPath statsObjectPath = ObjectPath.createFromResponse(statsResponse); + final ArrayList shardsStats = statsObjectPath.evaluate("indices.index.shards.0"); + assertThat(shardsStats, hasSize(2)); + boolean primaryFound = false; + for (final Object shardStats : shardsStats) { + @SuppressWarnings("unchecked") final Map shardStatsAsMap = (Map) shardStats; + @SuppressWarnings("unchecked") final Map routing = (Map) shardStatsAsMap.get("routing"); + if (Boolean.FALSE.equals(routing.get("primary"))) { + continue; + } + primaryFound = true; + @SuppressWarnings("unchecked") final Map retentionLeases = + (Map) shardStatsAsMap.get("retention_leases"); + @SuppressWarnings("unchecked") final ArrayList leases = (ArrayList) retentionLeases.get("leases"); + assertThat(leases, hasSize(1)); + @SuppressWarnings("unchecked") final Map lease = (Map) leases.get(0); + assertThat(lease.get("id"), equalTo("test-1")); + assertThat(lease.get("retaining_seq_no"), equalTo(0)); + assertThat(lease.get("source"), equalTo("rest")); + } + assertTrue(primaryFound); + } + + final int numberOfDocuments = randomIntBetween(1, 512); + for (int i = 0; i < numberOfDocuments; i++) { + final Request indexingRequest = new Request("PUT", "/index/_doc/" + i); + indexingRequest.setJsonEntity("{\"test\": \"test_" + randomAlphaOfLength(8) + "\"}"); + assertOK(client().performRequest(indexingRequest)); + } + + final Request refreshRequest = new Request("POST", "/index/_refresh"); + assertOK(client().performRequest(refreshRequest)); + + assertCount("index", "_primary", numberOfDocuments); + assertCount("index", "_replica", numberOfDocuments); + } + + private void ensureYellow(final String index) throws IOException { + final Request request = new Request("GET", "/_cluster/health/" + index); + request.addParameter("wait_for_status", "yellow"); + request.addParameter("wait_for_no_relocating_shards", "true"); + request.addParameter("timeout", "30s"); + request.addParameter("level", "shards"); + client().performRequest(request); + } + + private List getShards(final String index, final List nodes, final RestClient client) throws IOException { + final Request request = new Request("GET", index + "/_stats"); + request.addParameter("level", "shards"); + final Response response = client.performRequest(request); + final List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0"); + final ArrayList shards = new ArrayList<>(); + for (Object shard : shardStats) { + final String nodeId = ObjectPath.evaluate(shard, "routing.node"); + final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); + final Node node = nodes.stream().filter(n -> n.id().equals(nodeId)).findFirst().get(); + shards.add(new Shard(node, primary)); + } + return shards; + } + + private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { + final Request request = new Request("GET", index + "/_count"); + request.addParameter("preference", preference); + final Response response = client().performRequest(request); + assertOK(response); + final int actualCount = Integer.parseInt(ObjectPath.createFromResponse(response).evaluate("count").toString()); + assertThat(actualCount, equalTo(expectedCount)); + } + + final class Node { + + private final String id; + + public String id() { + return id; + } + + private final String nodeName; + + public String nodeName() { + return nodeName; + } + + private final Version version; + + public Version version() { + return version; + } + + private final HttpHost publishAddress; + + public HttpHost publishAddress() { + return publishAddress; + } + + Node(final String id, final String nodeName, final Version version, final HttpHost publishAddress) { + this.id = id; + this.nodeName = nodeName; + this.version = version; + this.publishAddress = publishAddress; + } + + @Override + public String toString() { + return "Node{" + + "id='" + id + '\'' + + ", nodeName='" + nodeName + '\'' + + ", version=" + version + + ", publishAddress=" + publishAddress + + '}'; + } + } + + final class Shard { + + private final Node node; + + public Node node() { + return node; + } + + private final boolean primary; + + public boolean primary() { + return primary; + } + + Shard(final Node node, final boolean primary) { + this.node = node; + this.primary = primary; + } + + @Override + public String toString() { + return "Shard{" + + "node=" + node + + ", primary=" + primary + + '}'; + } + + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 4b8b73b145bd5..48c68c9f66a91 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -24,14 +24,17 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -130,6 +133,18 @@ protected PrimaryResult shardOperationOnPrimary( return new PrimaryResult<>(request, new ReplicationResponse()); } + @Override + protected void sendReplicaRequest( + final ConcreteReplicaRequest replicaRequest, + final DiscoveryNode node, + final ActionListener listener) { + if (node.getVersion().onOrAfter(Version.V_6_7_0)) { + super.sendReplicaRequest(replicaRequest, node, listener); + } else { + listener.onResponse(new ReplicaResponse(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED)); + } + } + @Override protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) throws IOException { Objects.requireNonNull(request); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 3e46b10d7e8be..76f6ff2574baf 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -24,16 +24,19 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -132,6 +135,18 @@ protected WritePrimaryResult shardOperationOnPrimary( return new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger()); } + @Override + protected void sendReplicaRequest( + final ConcreteReplicaRequest replicaRequest, + final DiscoveryNode node, + final ActionListener listener) { + if (node.getVersion().onOrAfter(Version.V_6_7_0)) { + super.sendReplicaRequest(replicaRequest, node, listener); + } else { + listener.onResponse(new ReplicaResponse(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED)); + } + } + @Override protected WriteReplicaResult shardOperationOnReplica( final Request request, From e011571df5ff4df1fddfbddb66a839947b992521 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 27 Feb 2019 20:01:22 -0500 Subject: [PATCH 2/6] Remove hard-coded version --- qa/retention-lease-bwc/build.gradle | 85 +++++++++++++++++++++-------- 1 file changed, 63 insertions(+), 22 deletions(-) diff --git a/qa/retention-lease-bwc/build.gradle b/qa/retention-lease-bwc/build.gradle index bb7abbf5ff18a..d58795b495624 100644 --- a/qa/retention-lease-bwc/build.gradle +++ b/qa/retention-lease-bwc/build.gradle @@ -29,35 +29,76 @@ esplugin { integTest.enabled = false -task oldClusterTest(type: RestIntegTestTask) { - mustRunAfter(precommit) +task bwcTest { + description = 'runs retention lease backwards compatability tests' + group = 'verification' } -oldClusterTestCluster { - numNodes = 2 - numBwcNodes = 2 - bwcVersion = Version.fromString("6.6.2-SNAPSHOT") - setting "cluster.name", "retention-lease-bwc" -} +for (Version version : bwcVersions.wireCompatible) { + if (version.before("6.5.0")) { + // versions before 6.5.0 do not support soft deletes + continue + } -task newClusterTest(type: RestIntegTestTask) { + final String baseName = "v${version}" -} + final Task oldClusterTest = tasks.create(name: "${baseName}#oldClusterTest", type: RestIntegTestTask) { + mustRunAfter(precommit) + includePackaged = false + } + + final Object oldClusterTestCluster = extensions.findByName("${baseName}#oldClusterTestCluster") + + configure(oldClusterTestCluster) { + numNodes = 2 + numBwcNodes = 2 + bwcVersion = version + setting "cluster.name", "retention-lease-bwc" + } + + final Task newClusterTest = tasks.create(name: "${baseName}#newClusterTest", type: RestIntegTestTask) { + + } + + final Object newClusterTestCluster = extensions.findByName("${baseName}#newClusterTestCluster") + + configure(newClusterTestCluster) { + dependsOn "${baseName}#oldClusterTestCluster#wait" + numNodes = 1 + plugin ":qa:retention-lease-bwc" + setting "discovery.zen.ping.unicast.hosts", "\"${-> oldClusterTest.nodes.get(0).transportUri()}\"" + setting "cluster.name", "retention-lease-bwc" + setting "node.name", "new-node" + } + + final Object newClusterTestRunner = tasks.findByName("${baseName}#newClusterTestRunner") + + configure(newClusterTestRunner) { + finalizedBy "${baseName}#oldClusterTestCluster#node0.stop" + finalizedBy "${baseName}#oldClusterTestCluster#node1.stop" + finalizedBy "${baseName}#newClusterTestCluster#stop" + } + + final Task versionBwcTest = tasks.create(name: "${baseName}#bwcTest") { + dependsOn newClusterTest + } -newClusterTestCluster { - dependsOn "oldClusterTestCluster#wait" - numNodes = 1 - plugin ":qa:retention-lease-bwc" - setting "discovery.zen.ping.unicast.hosts", "\"${-> oldClusterTest.nodes.get(0).transportUri()}\"" - setting "cluster.name", "retention-lease-bwc" - setting "node.name", "new-node" + if (project.bwc_tests_enabled) { + bwcTest.dependsOn(versionBwcTest) + } } -newClusterTestRunner { - finalizedBy "oldClusterTestCluster#node0.stop" - finalizedBy "oldClusterTestCluster#node1.stop" - finalizedBy "newClusterTestCluster#stop" +task bwcTestSnapshots { + if (project.bwc_tests_enabled) { + for (final def version : bwcVersions.unreleasedWireCompatible) { + // versions before 6.5.0 do not support soft deletes + if (version.before("6.5.0")) { + continue + } + dependsOn "v${version}#bwcTest" + } + } } -check.dependsOn newClusterTest +check.dependsOn bwcTestSnapshots unitTest.enabled = false From b2331765bea3793137c70bdadcde0112827e6ef0 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 27 Feb 2019 21:54:24 -0500 Subject: [PATCH 3/6] Update qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java --- .../elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java b/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java index 7f9897e9a9f07..c9a51f9978767 100644 --- a/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java +++ b/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java @@ -43,7 +43,6 @@ public class RetentionLeaseBwcIT extends ESRestTestCase { - @SuppressForbidden(reason = "debug") public void testRetentionLeaseBwcIT() throws IOException { // we have to dance like this otherwise we can not end up with a primary on the new node with a replica on the old node final Response getNodesResponse = client().performRequest(new Request("GET", "/_nodes")); From b6357eba9b1afbb16f2592ed9308ee69cc7884ca Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 27 Feb 2019 21:54:41 -0500 Subject: [PATCH 4/6] Update qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java --- .../elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java b/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java index c9a51f9978767..8cd9b8215c85d 100644 --- a/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java +++ b/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java @@ -26,7 +26,6 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.rest.ESRestTestCase; From c2d036fc02d4f734d6e82f7929bb102c8956ee7b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 27 Feb 2019 21:56:04 -0500 Subject: [PATCH 5/6] Update qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/RestAddRetentionLeaseHandler.java --- .../retention_lease_bwc/RestAddRetentionLeaseHandler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/RestAddRetentionLeaseHandler.java b/qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/RestAddRetentionLeaseHandler.java index cc0582cdf65db..4bb8a0076f517 100644 --- a/qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/RestAddRetentionLeaseHandler.java +++ b/qa/retention-lease-bwc/src/main/java/org/elasticsearch/retention_lease_bwc/RestAddRetentionLeaseHandler.java @@ -79,7 +79,9 @@ protected void processResponse( channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); } - }, numberOfShards, Collections.emptyList()); + }, + numberOfShards, + Collections.emptyList()); for (int i = 0; i < numberOfShards; i++) { final ShardId shardId = new ShardId(indexMetaData.getIndex(), i); client.execute( From 8554e958f43ad38c81699be9c0c74f340e774fe3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 7 Mar 2019 11:53:11 -0500 Subject: [PATCH 6/6] Simplify --- .../add_retention_lease/RetentionLeaseBwcIT.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java b/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java index 8cd9b8215c85d..201a9cf7ce1d8 100644 --- a/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java +++ b/qa/retention-lease-bwc/src/test/java/org/elasticsearch/add_retention_lease/RetentionLeaseBwcIT.java @@ -102,17 +102,16 @@ public void testRetentionLeaseBwcIT() throws IOException { assertThat(shardsStats, hasSize(2)); boolean primaryFound = false; for (final Object shardStats : shardsStats) { - @SuppressWarnings("unchecked") final Map shardStatsAsMap = (Map) shardStats; - @SuppressWarnings("unchecked") final Map routing = (Map) shardStatsAsMap.get("routing"); + final Map shardStatsAsMap = (Map) shardStats; + final Map routing = (Map) shardStatsAsMap.get("routing"); if (Boolean.FALSE.equals(routing.get("primary"))) { continue; } primaryFound = true; - @SuppressWarnings("unchecked") final Map retentionLeases = - (Map) shardStatsAsMap.get("retention_leases"); - @SuppressWarnings("unchecked") final ArrayList leases = (ArrayList) retentionLeases.get("leases"); + final Map retentionLeases = (Map) shardStatsAsMap.get("retention_leases"); + final List leases = (List) retentionLeases.get("leases"); assertThat(leases, hasSize(1)); - @SuppressWarnings("unchecked") final Map lease = (Map) leases.get(0); + final Map lease = (Map) leases.get(0); assertThat(lease.get("id"), equalTo("test-1")); assertThat(lease.get("retaining_seq_no"), equalTo(0)); assertThat(lease.get("source"), equalTo("rest"));