Skip to content

Commit

Permalink
Add BWC for retention leases
Browse files Browse the repository at this point in the history
We have to handle the case of a < 6.7.0 and >= 6.7.0 mixed cluster
wherein a primary on >= 6.7.0 would otherwise send retention leases to a
< 6.7.0 node which would not understand them. This commit adds BWC for
this case, and adds a test to ensure that we behave properly here.
  • Loading branch information
jasontedor committed Feb 27, 2019
1 parent 203689a commit 155a363
Show file tree
Hide file tree
Showing 7 changed files with 495 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ class ClusterFormationTasks {
if (esConfig.containsKey('discovery.zen.hosts_provider') == false) {
esConfig['discovery.zen.hosts_provider'] = 'file'
}
esConfig['discovery.zen.ping.unicast.hosts'] = []
if (esConfig.containsKey('discovery.zen.ping.unicast.hosts') == false) {
esConfig['discovery.zen.ping.unicast.hosts'] = []
}
esConfig
}
dependsOn = startDependencies
Expand Down
63 changes: 63 additions & 0 deletions qa/retention-lease-bwc/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.test.RestIntegTestTask

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

apply plugin: 'elasticsearch.esplugin'

esplugin {
description 'add retention lease plugin'
classname 'org.elasticsearch.retention_lease_bwc.AddRetentionLeasePlugin'
}

integTest.enabled = false

task oldClusterTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
}

oldClusterTestCluster {
numNodes = 2
numBwcNodes = 2
bwcVersion = Version.fromString("6.6.2-SNAPSHOT")
setting "cluster.name", "retention-lease-bwc"
}

task newClusterTest(type: RestIntegTestTask) {

}

newClusterTestCluster {
dependsOn "oldClusterTestCluster#wait"
numNodes = 1
plugin ":qa:retention-lease-bwc"
setting "discovery.zen.ping.unicast.hosts", "\"${-> oldClusterTest.nodes.get(0).transportUri()}\""
setting "cluster.name", "retention-lease-bwc"
setting "node.name", "new-node"
}

newClusterTestRunner {
finalizedBy "oldClusterTestCluster#node0.stop"
finalizedBy "oldClusterTestCluster#node1.stop"
finalizedBy "newClusterTestCluster#stop"
}

check.dependsOn newClusterTest
unitTest.enabled = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.retention_lease_bwc;

import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;

import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

public class AddRetentionLeasePlugin extends Plugin implements ActionPlugin {

@Override
public List<RestHandler> getRestHandlers(
final Settings settings,
final RestController restController,
final ClusterSettings clusterSettings,
final IndexScopedSettings indexScopedSettings,
final SettingsFilter settingsFilter,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<DiscoveryNodes> nodesInCluster) {
return Collections.singletonList(new RestAddRetentionLeaseHandler(settings, restController));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.retention_lease_bwc;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestActionListener;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Collections;

public class RestAddRetentionLeaseHandler extends BaseRestHandler {

public RestAddRetentionLeaseHandler(final Settings settings, final RestController restController) {
super(settings);
restController.registerHandler(RestRequest.Method.PUT, "/{index}/_add_retention_lease", this);
}

@Override
public String getName() {
return "add_retention_lease";
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
final String index = request.param("index");
final String id = request.param("id");
final long retainingSequenceNumber = Long.parseLong(request.param("retaining_sequence_number"));
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(index);
return channel ->
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(final ClusterStateResponse clusterStateResponse) {
final IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().index(index);
final int numberOfShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexMetaData.getSettings());

final GroupedActionListener<RetentionLeaseActions.Response> listener = new GroupedActionListener<>(
new RestActionListener<Collection<RetentionLeaseActions.Response>>(channel) {

@Override
protected void processResponse(
final Collection<RetentionLeaseActions.Response> responses) throws Exception {
final XContentBuilder builder = channel.newBuilder().startObject().endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}

}, numberOfShards, Collections.emptyList());
for (int i = 0; i < numberOfShards; i++) {
final ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
client.execute(
RetentionLeaseActions.Add.INSTANCE,
new RetentionLeaseActions.AddRequest(shardId, id, retainingSequenceNumber, "rest"),
listener);
}
}

@Override
public void onFailure(final Exception e) {
try {
channel.sendResponse(new BytesRestResponse(channel, RestStatus.SERVICE_UNAVAILABLE, e));
} catch (IOException inner) {
inner.addSuppressed(e);
throw new UncheckedIOException(inner);
}
}
});
}
}
Loading

0 comments on commit 155a363

Please sign in to comment.