diff --git a/x-pack/plugin/rollup/build.gradle b/x-pack/plugin/rollup/build.gradle index f6dd4ece901d7..c258602938b1d 100644 --- a/x-pack/plugin/rollup/build.gradle +++ b/x-pack/plugin/rollup/build.gradle @@ -15,6 +15,7 @@ dependencies { compileOnly project(path: xpackModule('core')) compileOnly project(path: xpackModule('analytics')) compileOnly project(path: xpackModule('mapper-aggregate-metric')) + compileOnly project(path: xpackModule('ilm')) compileOnly project(':modules:data-streams') testImplementation(testArtifact(project(xpackModule('core')))) } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java index 748bbc5713d52..e896e648d9583 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java @@ -38,6 +38,8 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.Index; @@ -61,7 +63,6 @@ import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction; import java.io.IOException; -import java.time.Instant; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -79,6 +80,7 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction private final Client client; private final ClusterService clusterService; private final MetadataCreateIndexService metadataCreateIndexService; + private final IndexScopedSettings indexScopedSettings; /** * This is the cluster state task executor for cluster state update actions. @@ -107,7 +109,8 @@ public TransportRollupAction( ThreadPool threadPool, MetadataCreateIndexService metadataCreateIndexService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + IndexScopedSettings indexScopedSettings ) { super( RollupAction.NAME, @@ -122,6 +125,7 @@ public TransportRollupAction( this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN); this.clusterService = clusterService; this.metadataCreateIndexService = metadataCreateIndexService; + this.indexScopedSettings = indexScopedSettings; } @Override @@ -242,11 +246,12 @@ protected void masterOperation( client.execute(RollupIndexerAction.INSTANCE, rollupIndexerRequest, ActionListener.wrap(indexerResp -> { if (indexerResp.isCreated()) { // 4. Make rollup index read-only and set the correct number of replicas - final Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_BLOCKS_WRITE, true) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas()) - .build(); - UpdateSettingsRequest updateSettingsReq = new UpdateSettingsRequest(settings, rollupIndexName); + final Settings.Builder settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true); + // Number of replicas had been previously set to 0 to speed up index population + if (sourceIndexMetadata.getNumberOfReplicas() > 0) { + settings.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas()); + } + UpdateSettingsRequest updateSettingsReq = new UpdateSettingsRequest(settings.build(), rollupIndexName); updateSettingsReq.setParentTask(parentTask); client.admin().indices().updateSettings(updateSettingsReq, ActionListener.wrap(updateSettingsResponse -> { if (updateSettingsResponse.isAcknowledged()) { @@ -254,7 +259,7 @@ protected void masterOperation( refreshIndex(rollupIndexName, parentTask, ActionListener.wrap(refreshIndexResponse -> { if (refreshIndexResponse.getFailedShards() == 0) { // 6. Mark rollup index as "completed successfully" - updateRollupMetadata(sourceIndexName, rollupIndexName, request, ActionListener.wrap(resp -> { + updateRollupMetadata(rollupIndexName, request, ActionListener.wrap(resp -> { if (resp.isAcknowledged()) { // 7. Force-merge the rollup index to a single segment forceMergeIndex( @@ -427,42 +432,42 @@ public static String createRollupIndexMapping( } /** - * Copy index metadata from the source index to the rollup index. + * Copy index settings from the source index to the rollup index. Settings that + * have already been set in the rollup index will not be overridden. */ private IndexMetadata.Builder copyIndexMetadata(IndexMetadata sourceIndexMetadata, IndexMetadata rollupIndexMetadata) { - String sourceIndexName = sourceIndexMetadata.getIndex().getName(); + // Copy index settings from the source index, but do not override the settings + // that already have been set in the rollup index + final Settings.Builder targetSettings = Settings.builder().put(rollupIndexMetadata.getSettings()); + for (final String key : sourceIndexMetadata.getSettings().keySet()) { + final Setting setting = indexScopedSettings.get(key); + if (setting == null) { + assert indexScopedSettings.isPrivateSetting(key) : "expected [" + key + "] to be private but it was not"; + } else if (setting.getProperties().contains(Setting.Property.NotCopyableOnResize)) { + // we leverage the NotCopyableOnResize setting property for rollup, because + // the same rules with resize apply + continue; + } + // do not override settings that have already been set in the rollup index + if (targetSettings.keys().contains(key)) { + continue; + } + targetSettings.copy(key, sourceIndexMetadata.getSettings()); + } /* * Add the source index name and UUID to the rollup index metadata. * If the source index is a rollup index, we will add the name and UUID * of the first index that we initially rolled up. */ - String originalIndexName = IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.exists(sourceIndexMetadata.getSettings()) - ? IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.get(sourceIndexMetadata.getSettings()) - : sourceIndexName; - String originalIndexUuid = IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.exists(sourceIndexMetadata.getSettings()) - ? IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.get(sourceIndexMetadata.getSettings()) - : sourceIndexMetadata.getIndexUUID(); - - // Copy time series index settings from original index - List indexRoutingPath = sourceIndexMetadata.getRoutingPaths(); - Instant startTime = IndexSettings.TIME_SERIES_START_TIME.get(sourceIndexMetadata.getSettings()); - Instant endTime = IndexSettings.TIME_SERIES_END_TIME.get(sourceIndexMetadata.getSettings()); - IndexMode indexMode = IndexSettings.MODE.get(sourceIndexMetadata.getSettings()); - - return IndexMetadata.builder(rollupIndexMetadata) - .settings( - Settings.builder() - .put(rollupIndexMetadata.getSettings()) - .put(IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.getKey(), originalIndexName) - .put(IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.getKey(), originalIndexUuid) - .put(IndexMetadata.INDEX_HIDDEN_SETTING.getKey(), sourceIndexMetadata.isHidden()) - // Add the time series index settings - .put(IndexSettings.MODE.getKey(), indexMode) - .putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), indexRoutingPath) - .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), startTime.toString()) - .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), endTime.toString()) - ); + if (IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.exists(sourceIndexMetadata.getSettings()) == false + || IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.exists(sourceIndexMetadata.getSettings()) == false) { + Index sourceIndex = sourceIndexMetadata.getIndex(); + targetSettings.put(IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.getKey(), sourceIndex.getName()) + .put(IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.getKey(), sourceIndex.getUUID()); + } + + return IndexMetadata.builder(rollupIndexMetadata).settings(targetSettings); } /** @@ -524,12 +529,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { }, ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()), STATE_UPDATE_TASK_EXECUTOR); } - private void updateRollupMetadata( - String sourceIndexName, - String rollupIndexName, - RollupAction.Request request, - ActionListener listener - ) { + private void updateRollupMetadata(String rollupIndexName, RollupAction.Request request, ActionListener listener) { // 6. Mark rollup index as "completed successfully" ("index.rollup.status": "success") clusterService.submitStateUpdateTask( "update-rollup-metadata [" + rollupIndexName + "]", diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java index ff3201b0c23c7..4378280cfdb17 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -58,10 +59,13 @@ import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; import org.elasticsearch.xpack.analytics.AnalyticsPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.RollupActionConfig; import org.elasticsearch.xpack.core.rollup.action.RollupAction; import org.elasticsearch.xpack.core.rollup.action.RollupActionRequestValidationException; +import org.elasticsearch.xpack.ilm.IndexLifecycle; import org.elasticsearch.xpack.rollup.Rollup; import org.junit.Before; @@ -104,7 +108,8 @@ protected Collection> getPlugins() { Rollup.class, AnalyticsPlugin.class, AggregateMetricMapperPlugin.class, - DataStreamsPlugin.class + DataStreamsPlugin.class, + IndexLifecycle.class ); } @@ -115,7 +120,7 @@ public void setup() { startTime = randomLongBetween(946769284000L, 1607470084000L); // random date between 2000-2020 docCount = randomIntBetween(10, 9000); numOfShards = randomIntBetween(1, 4); - numOfReplicas = 0; // Since this is a single node, we cannot have replicas + numOfReplicas = randomIntBetween(0, 3); // Values for keyword dimensions dimensionValues = new ArrayList<>(MAX_DIM_VALUES); @@ -170,6 +175,38 @@ public void testRollupIndex() throws IOException { assertRollupIndex(sourceIndex, rollupIndex, config); } + public void testCopyIndexSettings() throws IOException { + Settings settings = Settings.builder() + .put(LifecycleSettings.LIFECYCLE_NAME, randomAlphaOfLength(5)) + .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.getKey(), randomAlphaOfLength(5)) + .put(LifecycleSettings.LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING.getKey(), randomBoolean()) + .build(); + logger.info("Updating index [{}] with settings [{}]", sourceIndex, settings); + + var updateSettingsReq = new UpdateSettingsRequest(settings, sourceIndex); + var r = client().admin().indices().updateSettings(updateSettingsReq).actionGet(); + assertTrue("Update settings not acked", r.isAcknowledged()); + + RollupActionConfig config = new RollupActionConfig(randomInterval()); + SourceSupplier sourceSupplier = () -> { + String ts = randomDateForInterval(config.getInterval()); + return XContentFactory.jsonBuilder() + .startObject() + .field(FIELD_TIMESTAMP, ts) + .field(FIELD_DIMENSION_1, randomFrom(dimensionValues)) + .field(FIELD_NUMERIC_1, randomInt()) + .endObject(); + }; + bulkIndex(sourceSupplier); + prepareSourceIndex(sourceIndex); + rollup(sourceIndex, rollupIndex, config); + + GetIndexResponse indexSettingsResp = client().admin().indices().prepareGetIndex().addIndices(rollupIndex).get(); + for (String key : settings.keySet()) { + assertEquals(settings.get(key), indexSettingsResp.getSetting(rollupIndex, key)); + } + } + public void testNullSourceIndexName() { RollupActionConfig config = new RollupActionConfig(randomInterval()); ActionRequestValidationException exception = expectThrows(