Skip to content

Commit

Permalink
[7.17] Register Feature migration persistent task state named XContent (
Browse files Browse the repository at this point in the history
#84192) (#84258)

In addition to the backport, this commit also adds SystemIndexMigrationTaskParams/State to writeableRegistry of transport client
This will backport the following commits from master to 7.17:

Register Feature migration persistent task state named XContent (#84192)
  • Loading branch information
gwbrown authored Feb 23, 2022
1 parent d90d673 commit 68251d4
Show file tree
Hide file tree
Showing 14 changed files with 553 additions and 271 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/84192.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 84192
summary: Registration of `SystemIndexMigrationTask` named xcontent objects
area: Infra/Core
type: bug
issues:
- 84115
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.migration;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.AssociatedIndexDescriptor;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.Assert;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public abstract class AbstractFeatureMigrationIntegTest extends ESIntegTestCase {

static final String VERSION_META_KEY = "version";
static final Version META_VERSION = Version.CURRENT;
static final String DESCRIPTOR_MANAGED_META_KEY = "desciptor_managed";
static final String DESCRIPTOR_INTERNAL_META_KEY = "descriptor_internal";
static final String FEATURE_NAME = "A-test-feature"; // Sorts alphabetically before the feature from MultiFeatureMigrationIT
static final String ORIGIN = AbstractFeatureMigrationIntegTest.class.getSimpleName();
static final String FlAG_SETTING_KEY = IndexMetadata.INDEX_PRIORITY_SETTING.getKey();
static final String INTERNAL_MANAGED_INDEX_NAME = ".int-man-old";
static final int INDEX_DOC_COUNT = 100; // arbitrarily chosen
static final int INTERNAL_MANAGED_FLAG_VALUE = 1;
public static final Version NEEDS_UPGRADE_VERSION = Version.V_6_0_0;

static final SystemIndexDescriptor EXTERNAL_UNMANAGED = SystemIndexDescriptor.builder()
.setIndexPattern(".ext-unman-*")
.setType(SystemIndexDescriptor.Type.EXTERNAL_UNMANAGED)
.setOrigin(ORIGIN)
.setVersionMetaKey(VERSION_META_KEY)
.setAllowedElasticProductOrigins(Collections.singletonList(ORIGIN))
.setMinimumNodeVersion(NEEDS_UPGRADE_VERSION)
.setPriorSystemIndexDescriptors(Collections.emptyList())

.build();
static final SystemIndexDescriptor INTERNAL_UNMANAGED = SystemIndexDescriptor.builder()
.setIndexPattern(".int-unman-*")
.setType(SystemIndexDescriptor.Type.INTERNAL_UNMANAGED)
.setOrigin(ORIGIN)
.setVersionMetaKey(VERSION_META_KEY)
.setAllowedElasticProductOrigins(Collections.emptyList())
.setMinimumNodeVersion(NEEDS_UPGRADE_VERSION)
.setPriorSystemIndexDescriptors(Collections.emptyList())
.build();

static final SystemIndexDescriptor INTERNAL_MANAGED = SystemIndexDescriptor.builder()
.setIndexPattern(".int-man-*")
.setAliasName(".internal-managed-alias")
.setPrimaryIndex(INTERNAL_MANAGED_INDEX_NAME)
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
.setSettings(createSimpleSettings(NEEDS_UPGRADE_VERSION, INTERNAL_MANAGED_FLAG_VALUE))
.setMappings(createSimpleMapping(true, true, false))
.setOrigin(ORIGIN)
.setVersionMetaKey(VERSION_META_KEY)
.setAllowedElasticProductOrigins(Collections.emptyList())
.setMinimumNodeVersion(NEEDS_UPGRADE_VERSION)
.setPriorSystemIndexDescriptors(Collections.emptyList())
.setIndexType("doc") // This simulates `.tasks`, which uses a nonstandard type name. EXTERNAL_MANAGED tests the default type.
.build();
static final int INTERNAL_UNMANAGED_FLAG_VALUE = 2;
static final int EXTERNAL_MANAGED_FLAG_VALUE = 3;
static final SystemIndexDescriptor EXTERNAL_MANAGED = SystemIndexDescriptor.builder()
.setIndexPattern(".ext-man-*")
.setAliasName(".external-managed-alias")
.setPrimaryIndex(".ext-man-old")
.setType(SystemIndexDescriptor.Type.EXTERNAL_MANAGED)
.setSettings(createSimpleSettings(NEEDS_UPGRADE_VERSION, EXTERNAL_MANAGED_FLAG_VALUE))
.setMappings(createSimpleMapping(true, false, true))
.setOrigin(ORIGIN)
.setVersionMetaKey(VERSION_META_KEY)
.setAllowedElasticProductOrigins(Collections.singletonList(ORIGIN))
.setMinimumNodeVersion(NEEDS_UPGRADE_VERSION)
.setPriorSystemIndexDescriptors(Collections.emptyList())
.build();
static final int EXTERNAL_UNMANAGED_FLAG_VALUE = 4;
static final String ASSOCIATED_INDEX_NAME = ".my-associated-idx";

@Before
public void setupTestPlugin() {
TestPlugin.preMigrationHook.set((state) -> Collections.emptyMap());
TestPlugin.postMigrationHook.set((state, metadata) -> {});
}

public void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) throws InterruptedException {
Assert.assertTrue(
"the strategy used below to create index names for descriptors without a primary index name only works for simple patterns",
descriptor.getIndexPattern().endsWith("*")
);
String indexName = Optional.ofNullable(descriptor.getPrimaryIndex()).orElse(descriptor.getIndexPattern().replace("*", "old"));
CreateIndexRequestBuilder createRequest = prepareCreate(indexName);
createRequest.setWaitForActiveShards(ActiveShardCount.ALL);
if (descriptor.getSettings() != null) {
createRequest.setSettings(
Settings.builder()
.put("index.version.created", Version.CURRENT)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build()
);
} else {
createRequest.setSettings(
createSimpleSettings(
NEEDS_UPGRADE_VERSION,
descriptor.isInternal() ? INTERNAL_UNMANAGED_FLAG_VALUE : EXTERNAL_UNMANAGED_FLAG_VALUE
)
);
}
if (descriptor.getMappings() == null) {
createRequest.addMapping("doc", createSimpleMapping(false, descriptor.isInternal(), false), XContentType.JSON);
}
CreateIndexResponse response = createRequest.get();
Assert.assertTrue(response.isShardsAcknowledged());

List<IndexRequestBuilder> docs = new ArrayList<>(INDEX_DOC_COUNT);
for (int i = 0; i < INDEX_DOC_COUNT; i++) {
docs.add(
ESIntegTestCase.client().prepareIndex(indexName, "_doc").setId(Integer.toString(i)).setSource("some_field", "words words")
);
}
indexRandom(true, docs);
IndicesStatsResponse indexStats = ESIntegTestCase.client().admin().indices().prepareStats(indexName).setDocs(true).get();
Assert.assertThat(indexStats.getIndex(indexName).getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT));
}

static Settings createSimpleSettings(Version creationVersion, int flagSettingValue) {
return Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(FlAG_SETTING_KEY, flagSettingValue)
.put("index.version.created", creationVersion)
.build();
}

static String createSimpleMapping(boolean descriptorManaged, boolean descriptorInternal, boolean useStandardType) {
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
if (useStandardType) {
builder.startObject("_doc");
} else {
builder.startObject("doc");
}
{
builder.startObject("_meta");
builder.field(VERSION_META_KEY, META_VERSION);
builder.field(DESCRIPTOR_MANAGED_META_KEY, descriptorManaged);
builder.field(DESCRIPTOR_INTERNAL_META_KEY, descriptorInternal);
builder.endObject();

builder.field("dynamic", "strict");
builder.startObject("properties");
{
builder.startObject("some_field");
builder.field("type", "keyword");
builder.endObject();
}
builder.endObject();
}
builder.endObject();
builder.endObject();
return Strings.toString(builder);
} catch (IOException e) {
// Just rethrow, it should be impossible for this to throw here
throw new AssertionError(e);
}
}

public void assertIndexHasCorrectProperties(
Metadata metadata,
String indexName,
int settingsFlagValue,
boolean isManaged,
boolean isInternal,
Collection<String> aliasNames
) {
IndexMetadata imd = metadata.index(indexName);
assertThat(imd.getSettings().get(FlAG_SETTING_KEY), equalTo(Integer.toString(settingsFlagValue)));
final Map<String, Object> mapping = imd.mapping().getSourceAsMap();
@SuppressWarnings("unchecked")
final Map<String, Object> meta = (Map<String, Object>) mapping.get("_meta");
assertThat(meta.get(DESCRIPTOR_MANAGED_META_KEY), is(isManaged));
assertThat(meta.get(DESCRIPTOR_INTERNAL_META_KEY), is(isInternal));

assertThat(imd.isSystem(), is(true));

Set<String> actualAliasNames = imd.getAliases().keySet();
assertThat(actualAliasNames, containsInAnyOrder(aliasNames.toArray()));

IndicesStatsResponse indexStats = client().admin().indices().prepareStats(imd.getIndex().getName()).setDocs(true).get();
assertNotNull(indexStats);
final IndexStats thisIndexStats = indexStats.getIndex(imd.getIndex().getName());
assertNotNull(thisIndexStats);
assertNotNull(thisIndexStats.getTotal());
assertNotNull(thisIndexStats.getTotal().getDocs());
assertThat(thisIndexStats.getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT));
}

public static class TestPlugin extends Plugin implements SystemIndexPlugin {
public static final AtomicReference<Function<ClusterState, Map<String, Object>>> preMigrationHook = new AtomicReference<>();
public static final AtomicReference<BiConsumer<ClusterState, Map<String, Object>>> postMigrationHook = new AtomicReference<>();

public TestPlugin() {

}

@Override
public String getFeatureName() {
return FEATURE_NAME;
}

@Override
public String getFeatureDescription() {
return "a plugin for testing system index migration";
}

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Arrays.asList(INTERNAL_MANAGED, INTERNAL_UNMANAGED, EXTERNAL_MANAGED, EXTERNAL_UNMANAGED);
}

@Override
public Collection<AssociatedIndexDescriptor> getAssociatedIndexDescriptors() {

return Collections.singletonList(new AssociatedIndexDescriptor(ASSOCIATED_INDEX_NAME, TestPlugin.class.getCanonicalName()));
}

@Override
public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener<Map<String, Object>> listener) {
listener.onResponse(preMigrationHook.get().apply(clusterService.state()));
}

@Override
public void indicesMigrationComplete(
Map<String, Object> preUpgradeMetadata,
ClusterService clusterService,
Client client,
ActionListener<Boolean> listener
) {
postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata);
listener.onResponse(true);
}
}
}
Loading

0 comments on commit 68251d4

Please sign in to comment.