Skip to content

Commit

Permalink
Add BWC for retention leases (#39482)
Browse files Browse the repository at this point in the history
We have to handle the case of a < 6.7.0 and >= 6.7.0 mixed cluster
wherein a primary on >= 6.7.0 would otherwise send retention leases to a
< 6.7.0 node which would not understand them. This commit adds BWC for
this case, and adds a test to ensure that we behave properly here.
  • Loading branch information
jasontedor authored Mar 8, 2019
1 parent da9f55c commit a24a1f5
Show file tree
Hide file tree
Showing 7 changed files with 535 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ class ClusterFormationTasks {
if (esConfig.containsKey('discovery.zen.hosts_provider') == false) {
esConfig['discovery.zen.hosts_provider'] = 'file'
}
esConfig['discovery.zen.ping.unicast.hosts'] = []
if (esConfig.containsKey('discovery.zen.ping.unicast.hosts') == false) {
esConfig['discovery.zen.ping.unicast.hosts'] = []
}
esConfig
}
dependsOn = startDependencies
Expand Down
104 changes: 104 additions & 0 deletions qa/retention-lease-bwc/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.test.RestIntegTestTask

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

apply plugin: 'elasticsearch.esplugin'

esplugin {
description 'add retention lease plugin'
classname 'org.elasticsearch.retention_lease_bwc.AddRetentionLeasePlugin'
}

integTest.enabled = false

task bwcTest {
description = 'runs retention lease backwards compatability tests'
group = 'verification'
}

for (Version version : bwcVersions.wireCompatible) {
if (version.before("6.5.0")) {
// versions before 6.5.0 do not support soft deletes
continue
}

final String baseName = "v${version}"

final Task oldClusterTest = tasks.create(name: "${baseName}#oldClusterTest", type: RestIntegTestTask) {
mustRunAfter(precommit)
includePackaged = false
}

final Object oldClusterTestCluster = extensions.findByName("${baseName}#oldClusterTestCluster")

configure(oldClusterTestCluster) {
numNodes = 2
numBwcNodes = 2
bwcVersion = version
setting "cluster.name", "retention-lease-bwc"
}

final Task newClusterTest = tasks.create(name: "${baseName}#newClusterTest", type: RestIntegTestTask) {

}

final Object newClusterTestCluster = extensions.findByName("${baseName}#newClusterTestCluster")

configure(newClusterTestCluster) {
dependsOn "${baseName}#oldClusterTestCluster#wait"
numNodes = 1
plugin ":qa:retention-lease-bwc"
setting "discovery.zen.ping.unicast.hosts", "\"${-> oldClusterTest.nodes.get(0).transportUri()}\""
setting "cluster.name", "retention-lease-bwc"
setting "node.name", "new-node"
}

final Object newClusterTestRunner = tasks.findByName("${baseName}#newClusterTestRunner")

configure(newClusterTestRunner) {
finalizedBy "${baseName}#oldClusterTestCluster#node0.stop"
finalizedBy "${baseName}#oldClusterTestCluster#node1.stop"
finalizedBy "${baseName}#newClusterTestCluster#stop"
}

final Task versionBwcTest = tasks.create(name: "${baseName}#bwcTest") {
dependsOn newClusterTest
}

if (project.bwc_tests_enabled) {
bwcTest.dependsOn(versionBwcTest)
}
}

task bwcTestSnapshots {
if (project.bwc_tests_enabled) {
for (final def version : bwcVersions.unreleasedWireCompatible) {
// versions before 6.5.0 do not support soft deletes
if (version.before("6.5.0")) {
continue
}
dependsOn "v${version}#bwcTest"
}
}
}

check.dependsOn bwcTestSnapshots
unitTest.enabled = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.retention_lease_bwc;

import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;

import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

public class AddRetentionLeasePlugin extends Plugin implements ActionPlugin {

@Override
public List<RestHandler> getRestHandlers(
final Settings settings,
final RestController restController,
final ClusterSettings clusterSettings,
final IndexScopedSettings indexScopedSettings,
final SettingsFilter settingsFilter,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<DiscoveryNodes> nodesInCluster) {
return Collections.singletonList(new RestAddRetentionLeaseHandler(settings, restController));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.retention_lease_bwc;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActionListener;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Collections;

public class RestAddRetentionLeaseHandler extends BaseRestHandler {

public RestAddRetentionLeaseHandler(final Settings settings, final RestController restController) {
super(settings);
restController.registerHandler(RestRequest.Method.PUT, "/{index}/_add_retention_lease", this);
}

@Override
public String getName() {
return "add_retention_lease";
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final String index = request.param("index");
final String id = request.param("id");
final long retainingSequenceNumber = Long.parseLong(request.param("retaining_sequence_number"));
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(index);
return channel ->
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(final ClusterStateResponse clusterStateResponse) {
final IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().index(index);
final int numberOfShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexMetaData.getSettings());

final GroupedActionListener<RetentionLeaseActions.Response> listener = new GroupedActionListener<>(
new RestActionListener<Collection<RetentionLeaseActions.Response>>(channel) {

@Override
protected void processResponse(
final Collection<RetentionLeaseActions.Response> responses) throws Exception {
final XContentBuilder builder = channel.newBuilder().startObject().endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}

},
numberOfShards,
Collections.emptyList());
for (int i = 0; i < numberOfShards; i++) {
final ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
client.execute(
RetentionLeaseActions.Add.INSTANCE,
new RetentionLeaseActions.AddRequest(shardId, id, retainingSequenceNumber, "rest"),
listener);
}
}

@Override
public void onFailure(final Exception e) {
try {
channel.sendResponse(new BytesRestResponse(channel, RestStatus.SERVICE_UNAVAILABLE, e));
} catch (IOException inner) {
inner.addSuppressed(e);
throw new UncheckedIOException(inner);
}
}
});
}
}
Loading

0 comments on commit a24a1f5

Please sign in to comment.