diff --git a/docs/changelog/87269.yaml b/docs/changelog/87269.yaml new file mode 100644 index 0000000000000..1c401c7669ba6 --- /dev/null +++ b/docs/changelog/87269.yaml @@ -0,0 +1,6 @@ +pr: 87269 +summary: "TSDB: Implement downsampling ILM Action for time-series indices" +area: TSDB +type: feature +issues: + - 68609 diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index ab0f3d91e1e33..032a5acc90c90 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -1138,6 +1138,7 @@ public Index getResizeSourceIndex() { ); public enum RollupTaskStatus { + UNKNOWN, STARTED, SUCCESS; @@ -1150,7 +1151,7 @@ public String toString() { public static final Setting INDEX_ROLLUP_STATUS = Setting.enumSetting( RollupTaskStatus.class, INDEX_ROLLUP_STATUS_KEY, - RollupTaskStatus.SUCCESS, + RollupTaskStatus.UNKNOWN, Property.IndexScope, Property.InternalIndex ); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CleanupTargetIndexStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CleanupTargetIndexStep.java new file mode 100644 index 0000000000000..55f373d46a292 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CleanupTargetIndexStep.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.ilm; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; + +import java.util.Objects; +import java.util.function.Function; + +/** + * Deletes the target index created by an operation such as shrink or rollup and + * identified the target index name stored in the lifecycle state of the managed + * index (if any was generated) + */ +public class CleanupTargetIndexStep extends AsyncRetryDuringSnapshotActionStep { + public static final String NAME = "cleanup-target-index"; + private static final Logger logger = LogManager.getLogger(CleanupTargetIndexStep.class); + + private final Function sourceIndexNameSupplier; + private final Function targetIndexNameSupplier; + + public CleanupTargetIndexStep( + StepKey key, + StepKey nextStepKey, + Client client, + Function sourceIndexNameSupplier, + Function targetIndexNameSupplier + ) { + super(key, nextStepKey, client); + this.sourceIndexNameSupplier = sourceIndexNameSupplier; + this.targetIndexNameSupplier = targetIndexNameSupplier; + } + + @Override + public boolean isRetryable() { + return true; + } + + Function getSourceIndexNameSupplier() { + return sourceIndexNameSupplier; + } + + Function getTargetIndexNameSupplier() { + return targetIndexNameSupplier; + } + + @Override + void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener listener) { + final String sourceIndexName = sourceIndexNameSupplier.apply(indexMetadata); + if (Strings.isNullOrEmpty(sourceIndexName) == false) { + // the current managed index is the target index + if (currentClusterState.metadata().index(sourceIndexName) == null) { + // if the source index does not exist, we'll skip deleting the + // (managed) target index as that will cause data loss + String policyName = indexMetadata.getLifecyclePolicyName(); + logger.warn( + "managed index [{}] has been created as part of policy [{}] and the source index [{}] does not exist " + + "anymore. will skip the [{}] step", + indexMetadata.getIndex().getName(), + policyName, + sourceIndexName, + NAME + ); + listener.onResponse(null); + return; + } + } + + final String targetIndexName = targetIndexNameSupplier.apply(indexMetadata); + // if the target index was not generated there is nothing to delete so we move on + if (Strings.hasText(targetIndexName) == false) { + listener.onResponse(null); + return; + } + getClient().admin() + .indices() + .delete(new DeleteIndexRequest(targetIndexName).masterNodeTimeout(TimeValue.MAX_VALUE), new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + // even if not all nodes acked the delete request yet we can consider this operation as successful as + // we'll generate a new index name and attempt to create an index with the newly generated name + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof IndexNotFoundException) { + // we can move on if the index was deleted in the meantime + listener.onResponse(null); + } else { + listener.onFailure(e); + } + } + }); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CleanupTargetIndexStep that = (CleanupTargetIndexStep) o; + return super.equals(o) + && Objects.equals(targetIndexNameSupplier, that.targetIndexNameSupplier) + && Objects.equals(sourceIndexNameSupplier, that.sourceIndexNameSupplier); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), targetIndexNameSupplier, sourceIndexNameSupplier); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CopySettingsStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CopySettingsStep.java index e9cbf4c29408b..e2a625ae05daf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CopySettingsStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/CopySettingsStep.java @@ -10,19 +10,21 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import java.util.Locale; import java.util.Objects; +import java.util.function.BiFunction; /** * Copy the provided settings from the source to the target index. *

- * The target index is derived from the source index using the provided prefix. - * This is useful for actions like shrink or searchable snapshot that create a new index and migrate the ILM execution from the source - * to the target index. + * The target index is generated by a supplier function. + * This is useful for actions like shrink, rollup or searchable snapshot that create + * a new index and migrate the ILM execution from the source to the target index. */ public class CopySettingsStep extends ClusterStateActionStep { public static final String NAME = "copy-settings"; @@ -30,14 +32,19 @@ public class CopySettingsStep extends ClusterStateActionStep { private static final Logger logger = LogManager.getLogger(CopySettingsStep.class); private final String[] settingsKeys; - private final String indexPrefix; - public CopySettingsStep(StepKey key, StepKey nextStepKey, String indexPrefix, String... settingsKeys) { + private final BiFunction targetIndexNameSupplier; + + public CopySettingsStep( + StepKey key, + StepKey nextStepKey, + BiFunction targetIndexNameSupplier, + String... settingsKeys + ) { super(key, nextStepKey); - Objects.requireNonNull(indexPrefix); Objects.requireNonNull(settingsKeys); - this.indexPrefix = indexPrefix; this.settingsKeys = settingsKeys; + this.targetIndexNameSupplier = targetIndexNameSupplier; } @Override @@ -49,17 +56,14 @@ public String[] getSettingsKeys() { return settingsKeys; } - public String getIndexPrefix() { - return indexPrefix; - } + BiFunction getTargetIndexNameSupplier() { + return targetIndexNameSupplier; + }; @Override public ClusterState performAction(Index index, ClusterState clusterState) { String sourceIndexName = index.getName(); IndexMetadata sourceIndexMetadata = clusterState.metadata().index(sourceIndexName); - String targetIndexName = indexPrefix + sourceIndexName; - IndexMetadata targetIndexMetadata = clusterState.metadata().index(targetIndexName); - if (sourceIndexMetadata == null) { // Index must have been since deleted, ignore it logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().getAction(), sourceIndexName); @@ -70,6 +74,8 @@ public ClusterState performAction(Index index, ClusterState clusterState) { return clusterState; } + String targetIndexName = targetIndexNameSupplier.apply(sourceIndexName, sourceIndexMetadata.getLifecycleExecutionState()); + IndexMetadata targetIndexMetadata = clusterState.metadata().index(targetIndexName); if (targetIndexMetadata == null) { String errorMessage = String.format( Locale.ROOT, @@ -107,11 +113,13 @@ public boolean equals(Object o) { return false; } CopySettingsStep that = (CopySettingsStep) o; - return Objects.equals(settingsKeys, that.settingsKeys) && Objects.equals(indexPrefix, that.indexPrefix); + return super.equals(o) + && Objects.equals(targetIndexNameSupplier, that.targetIndexNameSupplier) + && Objects.equals(settingsKeys, that.settingsKeys); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), settingsKeys, indexPrefix); + return Objects.hash(super.hashCode(), targetIndexNameSupplier, settingsKeys); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupILMAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupILMAction.java index 9d57e6d4a3ef0..7440dc1118210 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupILMAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupILMAction.java @@ -7,10 +7,13 @@ package org.elasticsearch.xpack.core.ilm; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.core.Nullable; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; @@ -23,77 +26,70 @@ import java.util.List; import java.util.Objects; +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; + /** * A {@link LifecycleAction} which calls {@link org.elasticsearch.xpack.core.rollup.action.RollupAction} on an index */ public class RollupILMAction implements LifecycleAction { - public static final String NAME = "rollup"; - private static final ParseField CONFIG_FIELD = new ParseField("config"); - private static final ParseField POLICY_FIELD = new ParseField("rollup_policy"); + public static final String NAME = "rollup"; + public static final String ROLLUP_INDEX_PREFIX = "rollup-"; + public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check"; + public static final String GENERATE_ROLLUP_STEP_NAME = "generate-rollup-name"; + private static final ParseField FIXED_INTERVAL_FIELD = new ParseField(RollupActionConfig.FIXED_INTERVAL); - @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME, - a -> new RollupILMAction((RollupActionConfig) a[0], (String) a[1]) + a -> new RollupILMAction((DateHistogramInterval) a[0]) ); - public static final String ROLLUP_INDEX_PREFIX = "rollup-"; - public static final String GENERATE_ROLLUP_STEP_NAME = "generate-rollup-name"; - - private final RollupActionConfig config; - private final String rollupPolicy; static { PARSER.declareField( - ConstructingObjectParser.constructorArg(), - (p, c) -> RollupActionConfig.fromXContent(p), - CONFIG_FIELD, - ObjectParser.ValueType.OBJECT + constructorArg(), + p -> new DateHistogramInterval(p.text()), + FIXED_INTERVAL_FIELD, + ObjectParser.ValueType.STRING ); - PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), POLICY_FIELD); } + private final DateHistogramInterval fixedInterval; + public static RollupILMAction parse(XContentParser parser) { return PARSER.apply(parser, null); } - public RollupILMAction(RollupActionConfig config, @Nullable String rollupPolicy) { - this.config = config; - this.rollupPolicy = rollupPolicy; + public RollupILMAction(DateHistogramInterval fixedInterval) { + if (fixedInterval == null) { + throw new IllegalArgumentException("Parameter [" + FIXED_INTERVAL_FIELD.getPreferredName() + "] is required."); + } + this.fixedInterval = fixedInterval; } public RollupILMAction(StreamInput in) throws IOException { - this(new RollupActionConfig(in), in.readOptionalString()); + this(new DateHistogramInterval(in)); } @Override - public String getWriteableName() { - return NAME; - } - - RollupActionConfig config() { - return config; - } - - String rollupPolicy() { - return rollupPolicy; + public void writeTo(StreamOutput out) throws IOException { + fixedInterval.writeTo(out); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(CONFIG_FIELD.getPreferredName(), config); - if (rollupPolicy != null) { - builder.field(POLICY_FIELD.getPreferredName(), rollupPolicy); - } + builder.field(FIXED_INTERVAL_FIELD.getPreferredName(), fixedInterval.toString()); builder.endObject(); return builder; } @Override - public void writeTo(StreamOutput out) throws IOException { - config.writeTo(out); - out.writeOptionalString(rollupPolicy); + public String getWriteableName() { + return NAME; + } + + public DateHistogramInterval fixedInterval() { + return fixedInterval; } @Override @@ -104,31 +100,113 @@ public boolean isSafeAction() { @Override public List toSteps(Client client, String phase, StepKey nextStepKey) { StepKey checkNotWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME); + StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME); StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyStep.NAME); + StepKey cleanupRollupIndexKey = new StepKey(phase, NAME, CleanupTargetIndexStep.NAME); StepKey generateRollupIndexNameKey = new StepKey(phase, NAME, GENERATE_ROLLUP_STEP_NAME); - StepKey rollupKey = new StepKey(phase, NAME, NAME); - CheckNotDataStreamWriteIndexStep checkNotWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNotWriteIndex, readOnlyKey); + StepKey rollupKey = new StepKey(phase, NAME, RollupStep.NAME); + StepKey waitForRollupIndexKey = new StepKey(phase, NAME, WaitForIndexColorStep.NAME); + StepKey copyMetadataKey = new StepKey(phase, NAME, CopyExecutionStateStep.NAME); + StepKey dataStreamCheckBranchingKey = new StepKey(phase, NAME, CONDITIONAL_DATASTREAM_CHECK_KEY); + StepKey replaceDataStreamIndexKey = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME); + StepKey deleteIndexKey = new StepKey(phase, NAME, DeleteStep.NAME); + StepKey swapAliasesKey = new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME); + + CheckNotDataStreamWriteIndexStep checkNotWriteIndexStep = new CheckNotDataStreamWriteIndexStep( + checkNotWriteIndex, + waitForNoFollowerStepKey + ); + WaitForNoFollowersStep waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, cleanupRollupIndexKey, client); + + // We generate a unique rollup index name, but we also retry if the allocation of the rollup index is not possible, so we want to + // delete the "previously generated" rollup index (this is a no-op if it's the first run of the action, and we haven't generated a + // rollup index name) + CleanupTargetIndexStep cleanupRollupIndexStep = new CleanupTargetIndexStep( + cleanupRollupIndexKey, + readOnlyKey, + client, + (indexMetadata) -> IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.get(indexMetadata.getSettings()), + (indexMetadata) -> indexMetadata.getLifecycleExecutionState().rollupIndexName() + ); + // Mark source index as read-only ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, generateRollupIndexNameKey, client); + + // Generate a unique rollup index name and store it in the ILM execution state GenerateUniqueIndexNameStep generateRollupIndexNameStep = new GenerateUniqueIndexNameStep( generateRollupIndexNameKey, rollupKey, ROLLUP_INDEX_PREFIX, (rollupIndexName, lifecycleStateBuilder) -> lifecycleStateBuilder.setRollupIndexName(rollupIndexName) ); - if (rollupPolicy == null) { - Step rollupStep = new RollupStep(rollupKey, nextStepKey, client, config); - return List.of(checkNotWriteIndexStep, readOnlyStep, generateRollupIndexNameStep, rollupStep); - } else { - StepKey updateRollupIndexPolicyStepKey = new StepKey(phase, NAME, UpdateRollupIndexPolicyStep.NAME); - Step rollupStep = new RollupStep(rollupKey, updateRollupIndexPolicyStepKey, client, config); - Step updateRollupIndexPolicyStep = new UpdateRollupIndexPolicyStep( - updateRollupIndexPolicyStepKey, - nextStepKey, - client, - rollupPolicy - ); - return List.of(checkNotWriteIndexStep, readOnlyStep, generateRollupIndexNameStep, rollupStep, updateRollupIndexPolicyStep); - } + + // Here is where the actual rollup action takes place + RollupStep rollupStep = new RollupStep(rollupKey, waitForRollupIndexKey, client, fixedInterval); + + // Wait until the rollup index is recovered. We again wait until the configured threshold is breached and + // if the rollup index has not successfully recovered until then, we rewind to the "cleanup-rollup-index" + // step to delete this unsuccessful rollup index and retry the operation by generating a new rollup index + // name and attempting to rollup again + ClusterStateWaitUntilThresholdStep rollupAllocatedStep = new ClusterStateWaitUntilThresholdStep( + new WaitForIndexColorStep( + waitForRollupIndexKey, + copyMetadataKey, + ClusterHealthStatus.YELLOW, + (indexName, lifecycleState) -> lifecycleState.rollupIndexName() + ), + cleanupRollupIndexKey + ); + + CopyExecutionStateStep copyExecutionStateStep = new CopyExecutionStateStep( + copyMetadataKey, + dataStreamCheckBranchingKey, + (indexName, lifecycleState) -> lifecycleState.rollupIndexName(), + nextStepKey + ); + + // By the time we get to this step we have 2 indices, the source and the rollup one. We now need to choose an index + // swapping strategy such that the rollup index takes the place of the source index (which will also be deleted). + // If the source index is part of a data stream it's a matter of replacing it with the rollup index one in the data stream and + // then deleting the source index. + BranchingStep isDataStreamBranchingStep = new BranchingStep( + dataStreamCheckBranchingKey, + swapAliasesKey, + replaceDataStreamIndexKey, + (index, clusterState) -> { + IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(index.getName()); + assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found"; + return indexAbstraction.getParentDataStream() != null; + } + ); + + ReplaceDataStreamBackingIndexStep replaceDataStreamBackingIndex = new ReplaceDataStreamBackingIndexStep( + replaceDataStreamIndexKey, + deleteIndexKey, + (sourceIndexName, lifecycleState) -> lifecycleState.rollupIndexName() + ); + DeleteStep deleteSourceIndexStep = new DeleteStep(deleteIndexKey, nextStepKey, client); + + SwapAliasesAndDeleteSourceIndexStep swapAliasesAndDeleteSourceIndexStep = new SwapAliasesAndDeleteSourceIndexStep( + swapAliasesKey, + nextStepKey, + client, + (indexName, lifecycleState) -> lifecycleState.rollupIndexName(), + false + ); + + return List.of( + checkNotWriteIndexStep, + waitForNoFollowersStep, + cleanupRollupIndexStep, + readOnlyStep, + generateRollupIndexNameStep, + rollupStep, + rollupAllocatedStep, + copyExecutionStateStep, + isDataStreamBranchingStep, + replaceDataStreamBackingIndex, + deleteSourceIndexStep, + swapAliasesAndDeleteSourceIndexStep + ); } @Override @@ -137,13 +215,12 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; RollupILMAction that = (RollupILMAction) o; - - return Objects.equals(this.config, that.config) && Objects.equals(this.rollupPolicy, that.rollupPolicy); + return Objects.equals(this.fixedInterval, that.fixedInterval); } @Override public int hashCode() { - return Objects.hash(config, rollupPolicy); + return Objects.hash(fixedInterval); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupStep.java index 3ad55a2b15d24..200ab54292a84 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupStep.java @@ -6,7 +6,10 @@ */ package org.elasticsearch.xpack.core.ilm; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -14,23 +17,28 @@ import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.xpack.core.rollup.RollupActionConfig; import org.elasticsearch.xpack.core.rollup.action.RollupAction; import java.util.Objects; /** - * Rolls up index using a {@link RollupActionConfig} + * ILM step that invokes the rollup action for an index using a {@link DateHistogramInterval}. The rollup + * index name is retrieved from the lifecycle state {@link LifecycleExecutionState#rollupIndexName()} + * index. If a rollup index with the same name has been already successfully created, this step + * will be skipped. */ public class RollupStep extends AsyncActionStep { public static final String NAME = "rollup"; - public static final String ROLLUP_INDEX_NAME_PREFIX = "rollup-"; - private final RollupActionConfig config; + private static final Logger logger = LogManager.getLogger(RollupStep.class); - public RollupStep(StepKey key, StepKey nextStepKey, Client client, RollupActionConfig config) { + private final DateHistogramInterval fixedInterval; + + public RollupStep(StepKey key, StepKey nextStepKey, Client client, DateHistogramInterval fixedInterval) { super(key, nextStepKey, client); - this.config = config; + this.fixedInterval = fixedInterval; } @Override @@ -45,9 +53,13 @@ public void performAction( ClusterStateObserver observer, ActionListener listener ) { + LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState(); + if (lifecycleState.lifecycleDate() == null) { + throw new IllegalStateException("source index [" + indexMetadata.getIndex().getName() + "] is missing lifecycle date"); + } + final String policyName = indexMetadata.getLifecyclePolicyName(); final String indexName = indexMetadata.getIndex().getName(); - final LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState(); final String rollupIndexName = lifecycleState.rollupIndexName(); if (Strings.hasText(rollupIndexName) == false) { listener.onFailure( @@ -57,8 +69,64 @@ public void performAction( ); return; } + + IndexMetadata rollupIndexMetadata = currentState.metadata().index(rollupIndexName); + if (rollupIndexMetadata != null) { + IndexMetadata.RollupTaskStatus rollupIndexStatus = IndexMetadata.INDEX_ROLLUP_STATUS.get(rollupIndexMetadata.getSettings()); + // Rollup index has already been created with the generated name and its status is "success". + // So we skip index rollup creation. + if (IndexMetadata.RollupTaskStatus.SUCCESS.equals(rollupIndexStatus)) { + logger.warn( + "skipping [{}] step for index [{}] as part of policy [{}] as the rollup index [{}] already exists", + RollupStep.NAME, + indexName, + policyName, + rollupIndexName + ); + listener.onResponse(null); + } else { + logger.warn( + "[{}] step for index [{}] as part of policy [{}] found the rollup index [{}] already exists. Deleting it.", + RollupStep.NAME, + indexName, + policyName, + rollupIndexName + ); + // Rollup index has already been created with the generated name but its status is not "success". + // So we delete the index and proceed with executing the rollup step. + DeleteIndexRequest deleteRequest = new DeleteIndexRequest(rollupIndexName); + getClient().admin().indices().delete(deleteRequest, ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + performRollupIndex(indexName, rollupIndexName, listener); + } else { + listener.onFailure( + new IllegalStateException( + "failing [" + + RollupStep.NAME + + "] step for index [" + + indexName + + "] as part of policy [" + + policyName + + "] because the rollup index [" + + rollupIndexName + + "] already exists with rollup status [" + + rollupIndexStatus + + "]" + ) + ); + } + }, listener::onFailure)); + } + return; + } + + performRollupIndex(indexName, rollupIndexName, listener); + } + + private void performRollupIndex(String indexName, String rollupIndexName, ActionListener listener) { + RollupActionConfig config = new RollupActionConfig(fixedInterval); RollupAction.Request request = new RollupAction.Request(indexName, rollupIndexName, config).masterNodeTimeout(TimeValue.MAX_VALUE); - // currently RollupAction always acknowledges action was complete when no exceptions are thrown. + // Currently, RollupAction always acknowledges action was complete when no exceptions are thrown. getClient().execute( RollupAction.INSTANCE, request, @@ -66,13 +134,13 @@ public void performAction( ); } - public RollupActionConfig getConfig() { - return config; + public DateHistogramInterval getFixedInterval() { + return fixedInterval; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), config); + return Objects.hash(super.hashCode(), fixedInterval); } @Override @@ -84,6 +152,6 @@ public boolean equals(Object obj) { return false; } RollupStep other = (RollupStep) obj; - return super.equals(obj) && Objects.equals(config, other.config); + return super.equals(obj) && Objects.equals(fixedInterval, other.fixedInterval); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java index c3af8d8b440f4..5a6103467b760 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SearchableSnapshotAction.java @@ -289,7 +289,7 @@ public List toSteps(Client client, String phase, StepKey nextStepKey, XPac CopySettingsStep copySettingsStep = new CopySettingsStep( copyLifecyclePolicySettingKey, dataStreamCheckBranchingKey, - getRestoredIndexPrefix(copyLifecyclePolicySettingKey), + (index, lifecycleState) -> getRestoredIndexPrefix(copyLifecyclePolicySettingKey) + index, LifecycleSettings.LIFECYCLE_NAME ); BranchingStep isDataStreamBranchingStep = new BranchingStep( diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkSetAliasStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkSetAliasStep.java index 766149d7d2171..ecccd72e1f4d1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkSetAliasStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkSetAliasStep.java @@ -38,7 +38,7 @@ public void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState cu // get target shrink index LifecycleExecutionState lifecycleState = indexMetadata.getLifecycleExecutionState(); String targetIndexName = getShrinkIndexName(indexName, lifecycleState); - deleteSourceIndexAndTransferAliases(getClient(), indexMetadata, targetIndexName, listener); + deleteSourceIndexAndTransferAliases(getClient(), indexMetadata, targetIndexName, listener, true); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SwapAliasesAndDeleteSourceIndexStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SwapAliasesAndDeleteSourceIndexStep.java index 39d2ae4ddea11..b5bbd65f0d2c7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SwapAliasesAndDeleteSourceIndexStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SwapAliasesAndDeleteSourceIndexStep.java @@ -14,10 +14,12 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.core.TimeValue; import java.util.Locale; import java.util.Objects; +import java.util.function.BiFunction; /** * This step swaps all the aliases from the source index to the restored index and deletes the source index. This is useful in scenarios @@ -27,11 +29,32 @@ public class SwapAliasesAndDeleteSourceIndexStep extends AsyncActionStep { public static final String NAME = "swap-aliases"; private static final Logger logger = LogManager.getLogger(SwapAliasesAndDeleteSourceIndexStep.class); - private final String targetIndexPrefix; + /** + * Supplier function that returns the name of the target index where aliases will + * point to + */ + private final BiFunction targetIndexNameSupplier; + + /** + * if true, this method will create an alias named as the source index and will link it + * to the target index + */ + private final boolean createSourceIndexAlias; public SwapAliasesAndDeleteSourceIndexStep(StepKey key, StepKey nextStepKey, Client client, String targetIndexPrefix) { + this(key, nextStepKey, client, (index, lifecycleState) -> targetIndexPrefix + index, true); + } + + public SwapAliasesAndDeleteSourceIndexStep( + StepKey key, + StepKey nextStepKey, + Client client, + BiFunction targetIndexNameSupplier, + boolean createSourceIndexAlias + ) { super(key, nextStepKey, client); - this.targetIndexPrefix = targetIndexPrefix; + this.targetIndexNameSupplier = targetIndexNameSupplier; + this.createSourceIndexAlias = createSourceIndexAlias; } @Override @@ -39,8 +62,12 @@ public boolean isRetryable() { return true; } - public String getTargetIndexPrefix() { - return targetIndexPrefix; + BiFunction getTargetIndexNameSupplier() { + return targetIndexNameSupplier; + } + + boolean getCreateSourceIndexAlias() { + return createSourceIndexAlias; } @Override @@ -51,7 +78,7 @@ public void performAction( ActionListener listener ) { String originalIndex = indexMetadata.getIndex().getName(); - final String targetIndexName = targetIndexPrefix + originalIndex; + final String targetIndexName = targetIndexNameSupplier.apply(originalIndex, indexMetadata.getLifecycleExecutionState()); IndexMetadata targetIndexMetadata = currentClusterState.metadata().index(targetIndexName); if (targetIndexMetadata == null) { @@ -68,7 +95,7 @@ public void performAction( return; } - deleteSourceIndexAndTransferAliases(getClient(), indexMetadata, targetIndexName, listener); + deleteSourceIndexAndTransferAliases(getClient(), indexMetadata, targetIndexName, listener, createSourceIndexAlias); } /** @@ -76,17 +103,24 @@ public void performAction( * index. *

* The is_write_index will *not* be set on the target index as this operation is currently executed on read-only indices. + * @param createSourceIndexAlias if true, this method will create an alias named as the source index and will link it + * to the target index */ static void deleteSourceIndexAndTransferAliases( Client client, IndexMetadata sourceIndex, String targetIndex, - ActionListener listener + ActionListener listener, + boolean createSourceIndexAlias ) { String sourceIndexName = sourceIndex.getIndex().getName(); IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest().masterNodeTimeout(TimeValue.MAX_VALUE) - .addAliasAction(IndicesAliasesRequest.AliasActions.removeIndex().index(sourceIndexName)) - .addAliasAction(IndicesAliasesRequest.AliasActions.add().index(targetIndex).alias(sourceIndexName)); + .addAliasAction(IndicesAliasesRequest.AliasActions.removeIndex().index(sourceIndexName)); + + if (createSourceIndexAlias) { + // create an alias with the same name as the source index and link it to the target index + aliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(targetIndex).alias(sourceIndexName)); + } // copy over other aliases from source index sourceIndex.getAliases().values().forEach(aliasMetaDataToAdd -> { // inherit all alias properties except `is_write_index` @@ -116,7 +150,7 @@ public boolean indexSurvives() { @Override public int hashCode() { - return Objects.hash(super.hashCode(), targetIndexPrefix); + return Objects.hash(super.hashCode(), targetIndexNameSupplier, createSourceIndexAlias); } @Override @@ -128,6 +162,8 @@ public boolean equals(Object obj) { return false; } SwapAliasesAndDeleteSourceIndexStep other = (SwapAliasesAndDeleteSourceIndexStep) obj; - return super.equals(obj) && Objects.equals(targetIndexPrefix, other.targetIndexPrefix); + return super.equals(obj) + && Objects.equals(targetIndexNameSupplier, other.targetIndexNameSupplier) + && createSourceIndexAlias == other.createSourceIndexAlias; } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java index 8e295f99c81e0..006fcde7a131f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java @@ -61,24 +61,25 @@ public class TimeseriesLifecycleType implements LifecycleType { ForceMergeAction.NAME, SearchableSnapshotAction.NAME ).filter(Objects::nonNull).toList(); - public static final List ORDERED_VALID_WARM_ACTIONS = Arrays.asList( + public static final List ORDERED_VALID_WARM_ACTIONS = Stream.of( SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME, + IndexSettings.isTimeSeriesModeEnabled() ? RollupILMAction.NAME : null, AllocateAction.NAME, MigrateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME - ); + ).filter(Objects::nonNull).toList(); public static final List ORDERED_VALID_COLD_ACTIONS = Stream.of( SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME, + IndexSettings.isTimeSeriesModeEnabled() ? RollupILMAction.NAME : null, SearchableSnapshotAction.NAME, AllocateAction.NAME, MigrateAction.NAME, - FreezeAction.NAME, - IndexSettings.isTimeSeriesModeEnabled() ? RollupILMAction.NAME : null + FreezeAction.NAME ).filter(Objects::nonNull).toList(); public static final List ORDERED_VALID_FROZEN_ACTIONS = List.of(UnfollowAction.NAME, SearchableSnapshotAction.NAME); public static final List ORDERED_VALID_DELETE_ACTIONS = List.of(WaitForSnapshotAction.NAME, DeleteAction.NAME); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForIndexColorStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForIndexColorStep.java index 8c3ce8c7d3967..59982b4d7931d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForIndexColorStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForIndexColorStep.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.core.Nullable; @@ -23,6 +24,7 @@ import java.io.IOException; import java.util.Locale; import java.util.Objects; +import java.util.function.BiFunction; /** * Wait Step for index based on color. Optionally derives the index name using the provided prefix (if any). @@ -34,30 +36,39 @@ class WaitForIndexColorStep extends ClusterStateWaitStep { private static final Logger logger = LogManager.getLogger(WaitForIndexColorStep.class); private final ClusterHealthStatus color; - @Nullable - private final String indexNamePrefix; + + private final BiFunction indexNameSupplier; WaitForIndexColorStep(StepKey key, StepKey nextStepKey, ClusterHealthStatus color) { - this(key, nextStepKey, color, null); + this(key, nextStepKey, color, (index, lifecycleState) -> index); } WaitForIndexColorStep(StepKey key, StepKey nextStepKey, ClusterHealthStatus color, @Nullable String indexNamePrefix) { + this(key, nextStepKey, color, (index, lifecycleState) -> indexNamePrefix + index); + } + + WaitForIndexColorStep( + StepKey key, + StepKey nextStepKey, + ClusterHealthStatus color, + BiFunction indexNameSupplier + ) { super(key, nextStepKey); this.color = color; - this.indexNamePrefix = indexNamePrefix; + this.indexNameSupplier = indexNameSupplier; } public ClusterHealthStatus getColor() { return this.color; } - public String getIndexNamePrefix() { - return indexNamePrefix; + BiFunction getIndexNameSupplier() { + return indexNameSupplier; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), this.color, this.indexNamePrefix); + return Objects.hash(super.hashCode(), this.color, this.indexNameSupplier); } @Override @@ -69,12 +80,15 @@ public boolean equals(Object obj) { return false; } WaitForIndexColorStep other = (WaitForIndexColorStep) obj; - return super.equals(obj) && Objects.equals(this.color, other.color) && Objects.equals(this.indexNamePrefix, other.indexNamePrefix); + return super.equals(obj) + && Objects.equals(this.color, other.color) + && Objects.equals(this.indexNameSupplier, other.indexNameSupplier); } @Override public Result isConditionMet(Index index, ClusterState clusterState) { - String indexName = indexNamePrefix != null ? indexNamePrefix + index.getName() : index.getName(); + LifecycleExecutionState lifecycleExecutionState = clusterState.metadata().index(index.getName()).getLifecycleExecutionState(); + String indexName = indexNameSupplier.apply(index.getName(), lifecycleExecutionState); IndexMetadata indexMetadata = clusterState.metadata().index(indexName); // check if the (potentially) derived index exists if (indexMetadata == null) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CleanupTargetIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CleanupTargetIndexStepTests.java new file mode 100644 index 0000000000000..efdaced753622 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CleanupTargetIndexStepTests.java @@ -0,0 +1,194 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.ilm; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.xpack.core.ilm.Step.StepKey; + +import java.util.Map; +import java.util.function.Function; + +import static org.elasticsearch.xpack.core.ilm.GenerateUniqueIndexNameStep.generateValidIndexName; +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.is; + +public class CleanupTargetIndexStepTests extends AbstractStepTestCase { + + @Override + public CleanupTargetIndexStep createRandomInstance() { + StepKey stepKey = randomStepKey(); + StepKey nextStepKey = randomStepKey(); + + return new CleanupTargetIndexStep( + stepKey, + nextStepKey, + client, + (indexMetadata) -> randomAlphaOfLengthBetween(1, 10), + (indexMetadata) -> randomAlphaOfLengthBetween(1, 10) + ); + } + + @Override + protected CleanupTargetIndexStep copyInstance(CleanupTargetIndexStep instance) { + return new CleanupTargetIndexStep( + instance.getKey(), + instance.getNextStepKey(), + instance.getClient(), + instance.getSourceIndexNameSupplier(), + instance.getTargetIndexNameSupplier() + ); + } + + @Override + public CleanupTargetIndexStep mutateInstance(CleanupTargetIndexStep instance) { + StepKey key = instance.getKey(); + StepKey nextKey = instance.getNextStepKey(); + Function sourceIndexNameSupplier = instance.getSourceIndexNameSupplier(); + Function targetIndexNameSupplier = instance.getTargetIndexNameSupplier(); + + switch (between(0, 3)) { + case 0 -> key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + case 1 -> nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + case 2 -> sourceIndexNameSupplier = (indexMetadata) -> randomAlphaOfLengthBetween(11, 15) + indexMetadata.getIndex().getName(); + case 3 -> targetIndexNameSupplier = (indexMetadata) -> randomAlphaOfLengthBetween(11, 15) + indexMetadata.getIndex().getName(); + default -> throw new AssertionError("Illegal randomisation branch"); + } + return new CleanupTargetIndexStep(key, nextKey, instance.getClient(), sourceIndexNameSupplier, targetIndexNameSupplier); + } + + public void testPerformActionDoesntFailIfShrinkingIndexNameIsMissing() { + String indexName = randomAlphaOfLength(10); + String policyName = "test-ilm-policy"; + + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .numberOfShards(randomIntBetween(1, 5)) + .numberOfReplicas(randomIntBetween(0, 5)); + + IndexMetadata indexMetadata = indexMetadataBuilder.build(); + ClusterState clusterState = ClusterState.builder(emptyClusterState()) + .metadata(Metadata.builder().put(indexMetadata, true).build()) + .build(); + + CleanupTargetIndexStep cleanupShrinkIndexStep = createRandomInstance(); + cleanupShrinkIndexStep.performAction(indexMetadata, clusterState, null, new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) { + fail( + "expecting the step to not report any failure if there isn't any shrink index name stored in the ILM execution " + + "state but got:" + + e.getMessage() + ); + } + }); + } + + public void testPerformAction() { + String indexName = randomAlphaOfLength(10); + String policyName = "test-ilm-policy"; + String shrinkIndexName = generateValidIndexName("shrink-", indexName); + Map ilmCustom = Map.of("shrink_index_name", shrinkIndexName); + + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, ilmCustom) + .numberOfShards(randomIntBetween(1, 5)) + .numberOfReplicas(randomIntBetween(0, 5)); + IndexMetadata indexMetadata = indexMetadataBuilder.build(); + + ClusterState clusterState = ClusterState.builder(emptyClusterState()) + .metadata(Metadata.builder().put(indexMetadata, true).build()) + .build(); + + try (NoOpClient client = getDeleteIndexRequestAssertingClient(shrinkIndexName)) { + CleanupTargetIndexStep step = new CleanupTargetIndexStep( + randomStepKey(), + randomStepKey(), + client, + (metadata) -> indexName, + (metadata) -> shrinkIndexName + ); + step.performAction(indexMetadata, clusterState, null, ActionListener.noop()); + } + } + + public void testDeleteSkippedIfManagedIndexIsShrunkAndSourceDoesntExist() { + String sourceIndex = randomAlphaOfLength(10); + String policyName = "test-ilm-policy"; + String shrinkIndexName = generateValidIndexName("shrink-", sourceIndex); + Map ilmCustom = Map.of("shrink_index_name", shrinkIndexName); + + IndexMetadata.Builder shrunkIndexMetadataBuilder = IndexMetadata.builder(shrinkIndexName) + .settings( + settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, policyName) + .put(IndexMetadata.INDEX_RESIZE_SOURCE_NAME_KEY, sourceIndex) + ) + .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, ilmCustom) + .numberOfShards(randomIntBetween(1, 5)) + .numberOfReplicas(randomIntBetween(0, 5)); + IndexMetadata shrunkIndexMetadata = shrunkIndexMetadataBuilder.build(); + + ClusterState clusterState = ClusterState.builder(emptyClusterState()) + .metadata(Metadata.builder().put(shrunkIndexMetadata, true).build()) + .build(); + + try (NoOpClient client = getFailingIfCalledClient()) { + CleanupTargetIndexStep step = new CleanupTargetIndexStep( + randomStepKey(), + randomStepKey(), + client, + (metadata) -> sourceIndex, + (metadata) -> shrinkIndexName + ); + step.performAction(shrunkIndexMetadata, clusterState, null, ActionListener.noop()); + } + } + + private NoOpClient getDeleteIndexRequestAssertingClient(String shrinkIndexName) { + return new NoOpClient(getTestName()) { + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + assertThat(action.name(), is(DeleteIndexAction.NAME)); + assertTrue(request instanceof DeleteIndexRequest); + assertThat(((DeleteIndexRequest) request).indices(), arrayContaining(shrinkIndexName)); + } + }; + } + + private NoOpClient getFailingIfCalledClient() { + return new NoOpClient(getTestName()) { + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + throw new IllegalStateException( + "not expecting client to be called, but received request [" + request + "] for action [" + action + "]" + ); + } + }; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CopySettingsStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CopySettingsStepTests.java index 85e617202fb8a..57a748ca93cd1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CopySettingsStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CopySettingsStepTests.java @@ -9,8 +9,11 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.Metadata; +import java.util.function.BiFunction; + import static org.hamcrest.Matchers.is; public class CopySettingsStepTests extends AbstractStepTestCase { @@ -20,7 +23,7 @@ protected CopySettingsStep createRandomInstance() { return new CopySettingsStep( randomStepKey(), randomStepKey(), - randomAlphaOfLengthBetween(1, 10), + (index, lifecycleState) -> randomAlphaOfLengthBetween(1, 10) + index, IndexMetadata.SETTING_NUMBER_OF_SHARDS ); } @@ -29,22 +32,27 @@ protected CopySettingsStep createRandomInstance() { protected CopySettingsStep mutateInstance(CopySettingsStep instance) { Step.StepKey key = instance.getKey(); Step.StepKey nextKey = instance.getNextStepKey(); - String indexPrefix = instance.getIndexPrefix(); + BiFunction targetIndexNameSupplier = instance.getTargetIndexNameSupplier(); String[] settingsKeys = instance.getSettingsKeys(); switch (between(0, 3)) { case 0 -> key = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); case 1 -> nextKey = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); - case 2 -> indexPrefix = randomValueOtherThan(indexPrefix, () -> randomAlphaOfLengthBetween(1, 10)); - case 3 -> settingsKeys = new String[] { randomAlphaOfLengthBetween(1, 10) }; + case 2 -> settingsKeys = new String[] { randomAlphaOfLengthBetween(1, 10) }; + case 3 -> targetIndexNameSupplier = (index, state) -> randomAlphaOfLengthBetween(11, 15) + index; default -> throw new AssertionError("Illegal randomisation branch"); } - return new CopySettingsStep(key, nextKey, indexPrefix, settingsKeys); + return new CopySettingsStep(key, nextKey, targetIndexNameSupplier, settingsKeys); } @Override protected CopySettingsStep copyInstance(CopySettingsStep instance) { - return new CopySettingsStep(instance.getKey(), instance.getNextStepKey(), instance.getIndexPrefix(), instance.getSettingsKeys()); + return new CopySettingsStep( + instance.getKey(), + instance.getNextStepKey(), + instance.getTargetIndexNameSupplier(), + instance.getSettingsKeys() + ); } public void testPerformAction() { @@ -71,7 +79,7 @@ public void testPerformAction() { CopySettingsStep copySettingsStep = new CopySettingsStep( randomStepKey(), randomStepKey(), - indexPrefix, + (sourceIndexName, lifecycleState) -> indexPrefix + indexName, LifecycleSettings.LIFECYCLE_NAME ); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupILMActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupILMActionTests.java index 42421ac761a82..c82ef50c9be71 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupILMActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupILMActionTests.java @@ -12,18 +12,17 @@ import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; -import org.elasticsearch.xpack.core.rollup.RollupActionConfig; -import org.elasticsearch.xpack.core.rollup.RollupActionConfigTests; import java.util.List; +import static org.elasticsearch.xpack.core.ilm.RollupILMAction.CONDITIONAL_DATASTREAM_CHECK_KEY; import static org.elasticsearch.xpack.core.ilm.RollupILMAction.GENERATE_ROLLUP_STEP_NAME; import static org.hamcrest.Matchers.equalTo; public class RollupILMActionTests extends AbstractActionTestCase { static RollupILMAction randomInstance() { - return new RollupILMAction(RollupActionConfigTests.randomConfig(random()), randomBoolean() ? randomAlphaOfLength(5) : null); + return new RollupILMAction(ConfigTestHelpers.randomInterval()); } @Override @@ -48,7 +47,7 @@ public boolean isSafeAction() { @Override public void testToSteps() { - RollupILMAction action = new RollupILMAction(RollupActionConfigTests.randomConfig(random()), null); + RollupILMAction action = new RollupILMAction(ConfigTestHelpers.randomInterval()); String phase = randomAlphaOfLengthBetween(1, 10); StepKey nextStepKey = new StepKey( randomAlphaOfLengthBetween(1, 10), @@ -57,15 +56,57 @@ public void testToSteps() { ); List steps = action.toSteps(null, phase, nextStepKey); assertNotNull(steps); - assertEquals(4, steps.size()); + assertEquals(12, steps.size()); + + assertTrue(steps.get(0) instanceof CheckNotDataStreamWriteIndexStep); assertThat(steps.get(0).getKey().getName(), equalTo(CheckNotDataStreamWriteIndexStep.NAME)); - assertThat(steps.get(0).getNextStepKey().getName(), equalTo(ReadOnlyStep.NAME)); - assertThat(steps.get(1).getKey().getName(), equalTo(ReadOnlyStep.NAME)); - assertThat(steps.get(1).getNextStepKey().getName(), equalTo(GENERATE_ROLLUP_STEP_NAME)); - assertThat(steps.get(2).getKey().getName(), equalTo(GENERATE_ROLLUP_STEP_NAME)); - assertThat(steps.get(2).getNextStepKey().getName(), equalTo(RollupStep.NAME)); - assertThat(steps.get(3).getKey().getName(), equalTo(RollupStep.NAME)); - assertThat(steps.get(3).getNextStepKey(), equalTo(nextStepKey)); + assertThat(steps.get(0).getNextStepKey().getName(), equalTo(WaitForNoFollowersStep.NAME)); + + assertTrue(steps.get(1) instanceof WaitForNoFollowersStep); + assertThat(steps.get(1).getKey().getName(), equalTo(WaitForNoFollowersStep.NAME)); + assertThat(steps.get(1).getNextStepKey().getName(), equalTo(CleanupTargetIndexStep.NAME)); + + assertTrue(steps.get(2) instanceof CleanupTargetIndexStep); + assertThat(steps.get(2).getKey().getName(), equalTo(CleanupTargetIndexStep.NAME)); + assertThat(steps.get(2).getNextStepKey().getName(), equalTo(ReadOnlyStep.NAME)); + + assertTrue(steps.get(3) instanceof ReadOnlyStep); + assertThat(steps.get(3).getKey().getName(), equalTo(ReadOnlyStep.NAME)); + assertThat(steps.get(3).getNextStepKey().getName(), equalTo(GENERATE_ROLLUP_STEP_NAME)); + + assertTrue(steps.get(4) instanceof GenerateUniqueIndexNameStep); + assertThat(steps.get(4).getKey().getName(), equalTo(GENERATE_ROLLUP_STEP_NAME)); + assertThat(steps.get(4).getNextStepKey().getName(), equalTo(RollupStep.NAME)); + + assertTrue(steps.get(5) instanceof RollupStep); + assertThat(steps.get(5).getKey().getName(), equalTo(RollupStep.NAME)); + assertThat(steps.get(5).getNextStepKey().getName(), equalTo(WaitForIndexColorStep.NAME)); + + assertTrue(steps.get(6) instanceof ClusterStateWaitUntilThresholdStep); + assertThat(steps.get(6).getKey().getName(), equalTo(WaitForIndexColorStep.NAME)); + assertThat(steps.get(6).getNextStepKey().getName(), equalTo(CopyExecutionStateStep.NAME)); + + assertTrue(steps.get(7) instanceof CopyExecutionStateStep); + assertThat(steps.get(7).getKey().getName(), equalTo(CopyExecutionStateStep.NAME)); + assertThat(steps.get(7).getNextStepKey().getName(), equalTo(CONDITIONAL_DATASTREAM_CHECK_KEY)); + + assertTrue(steps.get(8) instanceof BranchingStep); + assertThat(steps.get(8).getKey().getName(), equalTo(CONDITIONAL_DATASTREAM_CHECK_KEY)); + expectThrows(IllegalStateException.class, () -> steps.get(8).getNextStepKey()); + assertThat(((BranchingStep) steps.get(8)).getNextStepKeyOnFalse().getName(), equalTo(SwapAliasesAndDeleteSourceIndexStep.NAME)); + assertThat(((BranchingStep) steps.get(8)).getNextStepKeyOnTrue().getName(), equalTo(ReplaceDataStreamBackingIndexStep.NAME)); + + assertTrue(steps.get(9) instanceof ReplaceDataStreamBackingIndexStep); + assertThat(steps.get(9).getKey().getName(), equalTo(ReplaceDataStreamBackingIndexStep.NAME)); + assertThat(steps.get(9).getNextStepKey().getName(), equalTo(DeleteStep.NAME)); + + assertTrue(steps.get(10) instanceof DeleteStep); + assertThat(steps.get(10).getKey().getName(), equalTo(DeleteStep.NAME)); + assertThat(steps.get(10).getNextStepKey(), equalTo(nextStepKey)); + + assertTrue(steps.get(11) instanceof SwapAliasesAndDeleteSourceIndexStep); + assertThat(steps.get(11).getKey().getName(), equalTo(SwapAliasesAndDeleteSourceIndexStep.NAME)); + assertThat(steps.get(11).getNextStepKey(), equalTo(nextStepKey)); } public void testEqualsAndHashCode() { @@ -73,23 +114,11 @@ public void testEqualsAndHashCode() { } RollupILMAction copy(RollupILMAction rollupILMAction) { - return new RollupILMAction(rollupILMAction.config(), rollupILMAction.rollupPolicy()); + return new RollupILMAction(rollupILMAction.fixedInterval()); } RollupILMAction notCopy(RollupILMAction rollupILMAction) { - RollupActionConfig newConfig = rollupILMAction.config(); - String newRollupPolicy = rollupILMAction.rollupPolicy(); - switch (randomIntBetween(0, 1)) { - case 0 -> { - DateHistogramInterval fixedInterval = randomValueOtherThan( - rollupILMAction.config().getFixedInterval(), - ConfigTestHelpers::randomInterval - ); - newConfig = new RollupActionConfig(fixedInterval); - } - case 1 -> newRollupPolicy = randomAlphaOfLength(3); - default -> throw new IllegalStateException("unreachable branch"); - } - return new RollupILMAction(newConfig, newRollupPolicy); + DateHistogramInterval fixedInterval = randomValueOtherThan(rollupILMAction.fixedInterval(), ConfigTestHelpers::randomInterval); + return new RollupILMAction(fixedInterval); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupStepTests.java index 4e7cbc1bab050..264c3e7e3966c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupStepTests.java @@ -16,17 +16,18 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.xpack.core.ilm.Step.StepKey; -import org.elasticsearch.xpack.core.rollup.RollupActionConfig; -import org.elasticsearch.xpack.core.rollup.RollupActionConfigTests; +import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.action.RollupAction; import org.mockito.Mockito; -import java.util.Collections; import java.util.List; import java.util.Map; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; +import static org.elasticsearch.cluster.metadata.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY; +import static org.elasticsearch.xpack.core.ilm.RollupILMAction.ROLLUP_INDEX_PREFIX; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -37,36 +38,44 @@ public class RollupStepTests extends AbstractStepTestCase { public RollupStep createRandomInstance() { StepKey stepKey = randomStepKey(); StepKey nextStepKey = randomStepKey(); - RollupActionConfig config = RollupActionConfigTests.randomConfig(random()); - return new RollupStep(stepKey, nextStepKey, client, config); + DateHistogramInterval fixedInterval = ConfigTestHelpers.randomInterval(); + return new RollupStep(stepKey, nextStepKey, client, fixedInterval); } @Override public RollupStep mutateInstance(RollupStep instance) { StepKey key = instance.getKey(); StepKey nextKey = instance.getNextStepKey(); + DateHistogramInterval fixedInterval = instance.getFixedInterval(); - switch (between(0, 1)) { + switch (between(0, 2)) { case 0 -> key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); case 1 -> nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + case 2 -> fixedInterval = ConfigTestHelpers.randomInterval(); default -> throw new AssertionError("Illegal randomisation branch"); } - return new RollupStep(key, nextKey, instance.getClient(), instance.getConfig()); + return new RollupStep(key, nextKey, instance.getClient(), fixedInterval); } @Override public RollupStep copyInstance(RollupStep instance) { - return new RollupStep(instance.getKey(), instance.getNextStepKey(), instance.getClient(), instance.getConfig()); + return new RollupStep(instance.getKey(), instance.getNextStepKey(), instance.getClient(), instance.getFixedInterval()); } - private IndexMetadata getIndexMetadata(String index) { - Map ilmCustom = Collections.singletonMap("rollup_index_name", "rollup-index"); + private IndexMetadata getIndexMetadata(String index, String lifecycleName, RollupStep step) { + LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder(); + lifecycleState.setPhase(step.getKey().getPhase()); + lifecycleState.setAction(step.getKey().getAction()); + lifecycleState.setStep(step.getKey().getName()); + lifecycleState.setIndexCreationDate(randomNonNegativeLong()); + lifecycleState.setRollupIndexName("rollup-index"); + return IndexMetadata.builder(index) - .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, "test-ilm-policy")) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, lifecycleName)) .numberOfShards(randomIntBetween(1, 5)) .numberOfReplicas(randomIntBetween(0, 5)) - .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, ilmCustom) + .putCustom(ILM_CUSTOM_METADATA_KEY, lifecycleState.build().asMap()) .build(); } @@ -77,11 +86,11 @@ private static void assertRollupActionRequest(RollupAction.Request request, Stri } public void testPerformAction() throws Exception { - String index = randomAlphaOfLength(5); - IndexMetadata indexMetadata = getIndexMetadata(index); - + String lifecycleName = randomAlphaOfLength(5); RollupStep step = createRandomInstance(); + String index = randomAlphaOfLength(5); + IndexMetadata indexMetadata = getIndexMetadata(index, lifecycleName, step); mockClientRollupCall(index); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder().put(indexMetadata, true)).build(); @@ -89,14 +98,24 @@ public void testPerformAction() throws Exception { } public void testPerformActionFailureInvalidExecutionState() { + String lifecycleName = randomAlphaOfLength(5); + RollupStep step = createRandomInstance(); + + LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder(); + lifecycleState.setPhase(step.getKey().getPhase()); + lifecycleState.setAction(step.getKey().getAction()); + lifecycleState.setStep(step.getKey().getName()); + lifecycleState.setIndexCreationDate(randomNonNegativeLong()); + IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLength(10)) - .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, "test-ilm-policy")) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, lifecycleName)) .numberOfShards(randomIntBetween(1, 5)) .numberOfReplicas(randomIntBetween(0, 5)) + .putCustom(ILM_CUSTOM_METADATA_KEY, lifecycleState.build().asMap()) .build(); + String policyName = indexMetadata.getLifecyclePolicyName(); String indexName = indexMetadata.getIndex().getName(); - RollupStep step = createRandomInstance(); step.performAction(indexMetadata, emptyClusterState(), null, new ActionListener<>() { @Override public void onResponse(Void unused) { @@ -115,11 +134,11 @@ public void onFailure(Exception e) { } public void testPerformActionOnDataStream() throws Exception { + RollupStep step = createRandomInstance(); + String lifecycleName = randomAlphaOfLength(5); String dataStreamName = "test-datastream"; String backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); - IndexMetadata indexMetadata = getIndexMetadata(backingIndexName); - - RollupStep step = createRandomInstance(); + IndexMetadata indexMetadata = getIndexMetadata(backingIndexName, lifecycleName, step); mockClientRollupCall(backingIndexName); @@ -129,6 +148,101 @@ public void testPerformActionOnDataStream() throws Exception { PlainActionFuture.get(f -> step.performAction(indexMetadata, clusterState, null, f)); } + /** + * Test rollup step when a successfully completed rollup index already exists. + */ + public void testPerformActionCompletedRollupIndexExists() { + String sourceIndexName = randomAlphaOfLength(10); + String lifecycleName = randomAlphaOfLength(5); + RollupStep step = createRandomInstance(); + + LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder(); + lifecycleState.setPhase(step.getKey().getPhase()); + lifecycleState.setAction(step.getKey().getAction()); + lifecycleState.setStep(step.getKey().getName()); + lifecycleState.setIndexCreationDate(randomNonNegativeLong()); + + String rollupIndex = GenerateUniqueIndexNameStep.generateValidIndexName(ROLLUP_INDEX_PREFIX, sourceIndexName); + lifecycleState.setRollupIndexName(rollupIndex); + + IndexMetadata sourceIndexMetadata = IndexMetadata.builder(sourceIndexName) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, lifecycleName)) + .putCustom(ILM_CUSTOM_METADATA_KEY, lifecycleState.build().asMap()) + .numberOfShards(randomIntBetween(1, 5)) + .numberOfReplicas(randomIntBetween(0, 5)) + .build(); + + // Create a successfully completed rollup index (index.rollup.status: success) + IndexMetadata indexMetadata = IndexMetadata.builder(rollupIndex) + .settings(settings(Version.CURRENT).put(IndexMetadata.INDEX_ROLLUP_STATUS.getKey(), IndexMetadata.RollupTaskStatus.SUCCESS)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + Map indices = Map.of(rollupIndex, indexMetadata); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().indices(indices)).build(); + + Mockito.doThrow(new IllegalStateException("Rollup action should not be invoked")) + .when(client) + .execute(Mockito.any(), Mockito.any(), Mockito.any()); + + step.performAction(sourceIndexMetadata, clusterState, null, new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) { + fail("onFailure should not be called in this test, called with exception: " + e.getMessage()); + } + }); + } + + /** + * Test rollup step when an in-progress rollup index already exists. + */ + public void testPerformActionRollupInProgressIndexExists() { + String sourceIndexName = randomAlphaOfLength(10); + String lifecycleName = randomAlphaOfLength(5); + RollupStep step = createRandomInstance(); + + LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder(); + lifecycleState.setPhase(step.getKey().getPhase()); + lifecycleState.setAction(step.getKey().getAction()); + lifecycleState.setStep(step.getKey().getName()); + lifecycleState.setIndexCreationDate(randomNonNegativeLong()); + + String rollupIndex = GenerateUniqueIndexNameStep.generateValidIndexName(ROLLUP_INDEX_PREFIX, sourceIndexName); + lifecycleState.setRollupIndexName(rollupIndex); + + IndexMetadata sourceIndexMetadata = IndexMetadata.builder(sourceIndexName) + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME, lifecycleName)) + .putCustom(ILM_CUSTOM_METADATA_KEY, lifecycleState.build().asMap()) + .numberOfShards(randomIntBetween(1, 5)) + .numberOfReplicas(randomIntBetween(0, 5)) + .build(); + + // Create an in-progress rollup index (index.rollup.status: started) + IndexMetadata indexMetadata = IndexMetadata.builder(rollupIndex) + .settings(settings(Version.CURRENT).put(IndexMetadata.INDEX_ROLLUP_STATUS.getKey(), IndexMetadata.RollupTaskStatus.STARTED)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + Map indices = Map.of(rollupIndex, indexMetadata); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().indices(indices)).build(); + + step.performAction(sourceIndexMetadata, clusterState, null, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + fail("onResponse should not be called in this test, because there's an in-progress rollup index"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof IllegalStateException); + assertTrue(e.getMessage().contains("already exists with rollup status [started]")); + } + }); + } + private void mockClientRollupCall(String sourceIndex) { Mockito.doAnswer(invocation -> { RollupAction.Request request = (RollupAction.Request) invocation.getArguments()[1]; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SwapAliasesAndDeleteSourceIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SwapAliasesAndDeleteSourceIndexStepTests.java index fa634813e3fc4..40c1f787f4a2b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SwapAliasesAndDeleteSourceIndexStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SwapAliasesAndDeleteSourceIndexStepTests.java @@ -17,12 +17,14 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.xpack.core.ilm.Step.StepKey; import java.util.Arrays; import java.util.List; +import java.util.function.BiFunction; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -43,7 +45,8 @@ protected SwapAliasesAndDeleteSourceIndexStep copyInstance(SwapAliasesAndDeleteS instance.getKey(), instance.getNextStepKey(), instance.getClient(), - instance.getTargetIndexPrefix() + instance.getTargetIndexNameSupplier(), + instance.getCreateSourceIndexAlias() ); } @@ -51,14 +54,16 @@ protected SwapAliasesAndDeleteSourceIndexStep copyInstance(SwapAliasesAndDeleteS public SwapAliasesAndDeleteSourceIndexStep mutateInstance(SwapAliasesAndDeleteSourceIndexStep instance) { StepKey key = instance.getKey(); StepKey nextKey = instance.getNextStepKey(); - String restoredIndexPrefix = instance.getTargetIndexPrefix(); - switch (between(0, 2)) { + BiFunction indexNameSupplier = instance.getTargetIndexNameSupplier(); + boolean createSourceIndexAlias = instance.getCreateSourceIndexAlias(); + switch (between(0, 3)) { case 0 -> key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); case 1 -> nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); - case 2 -> restoredIndexPrefix += randomAlphaOfLength(5); + case 2 -> indexNameSupplier = (index, state) -> index + randomAlphaOfLength(5); + case 3 -> createSourceIndexAlias = createSourceIndexAlias == false; default -> throw new AssertionError("Illegal randomisation branch"); } - return new SwapAliasesAndDeleteSourceIndexStep(key, nextKey, instance.getClient(), restoredIndexPrefix); + return new SwapAliasesAndDeleteSourceIndexStep(key, nextKey, instance.getClient(), indexNameSupplier, createSourceIndexAlias); } public void testPerformAction() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java index e1e817477b159..3e577a63dd4d2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.rollup.RollupActionConfig; import java.util.ArrayList; import java.util.Arrays; @@ -72,7 +71,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase { // keeping the migrate action disabled as otherwise it could conflict with the allocate action if both are randomly selected for the // same phase private static final MigrateAction TEST_MIGRATE_ACTION = MigrateAction.DISABLED; - private static final RollupILMAction TEST_ROLLUP_ACTION = new RollupILMAction(new RollupActionConfig(DateHistogramInterval.DAY), null); + private static final RollupILMAction TEST_ROLLUP_ACTION = new RollupILMAction(DateHistogramInterval.DAY); public void testValidatePhases() { boolean invalid = randomBoolean(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForIndexColorStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForIndexColorStepTests.java index 9e1b7a09dc6a2..9259c63f243e9 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForIndexColorStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForIndexColorStepTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.LifecycleExecutionState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; @@ -21,6 +22,8 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.xpack.core.ilm.Step.StepKey; +import java.util.function.BiFunction; + import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -48,7 +51,7 @@ protected WaitForIndexColorStep mutateInstance(WaitForIndexColorStep instance) { StepKey key = instance.getKey(); StepKey nextKey = instance.getNextStepKey(); ClusterHealthStatus color = instance.getColor(), newColor = randomColor(); - String indexPrefix = instance.getIndexNamePrefix(); + BiFunction indexNameSupplier = instance.getIndexNameSupplier(); while (color.equals(newColor)) { newColor = randomColor(); @@ -60,12 +63,17 @@ protected WaitForIndexColorStep mutateInstance(WaitForIndexColorStep instance) { case 2 -> color = newColor; } - return new WaitForIndexColorStep(key, nextKey, color, indexPrefix); + return new WaitForIndexColorStep(key, nextKey, color, indexNameSupplier); } @Override protected WaitForIndexColorStep copyInstance(WaitForIndexColorStep instance) { - return new WaitForIndexColorStep(instance.getKey(), instance.getNextStepKey(), instance.getColor(), instance.getIndexNamePrefix()); + return new WaitForIndexColorStep( + instance.getKey(), + instance.getNextStepKey(), + instance.getColor(), + instance.getIndexNameSupplier() + ); } public void testConditionMetForGreen() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupActionConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupActionConfigTests.java index 3c199cdc57564..4018cd25802da 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupActionConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupActionConfigTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; -import java.util.Random; import static org.hamcrest.Matchers.equalTo; @@ -20,10 +19,10 @@ public class RollupActionConfigTests extends AbstractSerializingTestCase getOnlyIndexSettings(RestClient client, String public static void createIndexWithSettings(RestClient client, String index, String alias, Settings.Builder settings) throws IOException { - createIndexWithSettings(client, index, alias, settings, randomBoolean()); + createIndexWithSettings(client, index, alias, settings, null); + } + + public static void createIndexWithSettings(RestClient client, String index, String alias, Settings.Builder settings, String mapping) + throws IOException { + createIndexWithSettings(client, index, alias, settings, mapping, randomBoolean()); + } + + public static void createIndexWithSettings( + RestClient client, + String index, + String alias, + Settings.Builder settings, + boolean useWriteIndex + ) throws IOException { + createIndexWithSettings(client, index, alias, settings, null, useWriteIndex); } public static void createIndexWithSettings( @@ -297,6 +318,7 @@ public static void createIndexWithSettings( String index, String alias, Settings.Builder settings, + String mapping, boolean useWriteIndex ) throws IOException { Request request = new Request("PUT", "/" + index); @@ -305,11 +327,13 @@ public static void createIndexWithSettings( if (useWriteIndex) { writeIndexSnippet = "\"is_write_index\": true"; } + String m = mapping != null ? "\"mappings\": %s, ".formatted(mapping) : ""; request.setJsonEntity(""" { "settings": %s, + %s "aliases" : { "%s": { %s } } - }""".formatted(Strings.toString(settings.build()), alias, writeIndexSnippet)); + }""".formatted(Strings.toString(settings.build()), m, alias, writeIndexSnippet)); client.performRequest(request); // wait for the shards to initialize ensureGreen(index); @@ -418,4 +442,12 @@ public static String waitAndGetShrinkIndexName(RestClient client, String origina logger.info("--> original index name is [{}], shrunken index name is [{}]", originalIndex, shrunkenIndexName[0]); return shrunkenIndexName[0]; } + + public static Template getTemplate(String policyName) { + return new Template(getLifecycleSettings(policyName), null, null); + } + + public static Settings getLifecycleSettings(String policyName) { + return Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyName).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).build(); + } } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java index 65f4538a5b705..78745c29c4d03 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java @@ -7,13 +7,10 @@ package org.elasticsearch.xpack.ilm; -import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Template; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.test.rest.ESRestTestCase; @@ -22,7 +19,6 @@ import org.elasticsearch.xpack.core.ilm.DeleteAction; import org.elasticsearch.xpack.core.ilm.ForceMergeAction; import org.elasticsearch.xpack.core.ilm.FreezeAction; -import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; import org.elasticsearch.xpack.core.ilm.RolloverAction; @@ -31,7 +27,6 @@ import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep; import org.junit.Before; -import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Locale; @@ -44,6 +39,7 @@ import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.getOnlyIndexSettings; import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.getTemplate; import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument; import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition; import static org.elasticsearch.xpack.TimeSeriesRestDriver.waitAndGetShrinkIndexName; @@ -307,11 +303,4 @@ public void testDeleteOnlyIndexInDataStreamDeletesDataStream() throws Exception }); } - private static Template getTemplate(String policyName) throws IOException { - return new Template(getLifecycleSettings(policyName), null, null); - } - - private static Settings getLifecycleSettings(String policyName) { - return Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyName).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).build(); - } } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/RollupActionIT.java b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/RollupActionIT.java index 85e8c557a7267..623c713e20674 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/RollupActionIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/RollupActionIT.java @@ -7,93 +7,301 @@ package org.elasticsearch.xpack.ilm.actions; -import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexMetadata.RollupTaskStatus; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.rest.action.admin.indices.RestPutIndexTemplateAction; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xpack.core.ilm.CheckNotDataStreamWriteIndexStep; +import org.elasticsearch.xpack.core.ilm.LifecycleAction; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; +import org.elasticsearch.xpack.core.ilm.Phase; +import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep; +import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.ilm.RollupILMAction; -import org.elasticsearch.xpack.core.rollup.RollupActionConfig; +import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.junit.Before; import java.io.IOException; +import java.time.Instant; +import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.TimeSeriesRestDriver.createIndexWithSettings; import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.explainIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.getOnlyIndexSettings; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex; import static org.elasticsearch.xpack.TimeSeriesRestDriver.index; +import static org.elasticsearch.xpack.TimeSeriesRestDriver.rolloverMaxOneDocCondition; import static org.elasticsearch.xpack.TimeSeriesRestDriver.updatePolicy; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/68609") public class RollupActionIT extends ESRestTestCase { private String index; private String policy; private String alias; + private String dataStream; + + private static final String TEMPLATE = """ + { + "index_patterns": ["%s*"], + "template": { + "settings":{ + "index": { + "number_of_replicas": 0, + "number_of_shards": 1, + "mode": "time_series" + }, + "index.lifecycle.name": "%s" + }, + "mappings":{ + "properties": { + "@timestamp" : { + "type": "date" + }, + "metricset": { + "type": "keyword", + "time_series_dimension": true + }, + "volume": { + "type": "double", + "time_series_metric": "gauge" + } + } + } + }, + "data_stream": { } + }"""; @Before - public void refreshIndex() { + public void refreshAbstractions() { index = "index-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); policy = "policy-" + randomAlphaOfLength(5); alias = "alias-" + randomAlphaOfLength(5); - logger.info("--> running [{}] with index [{}], alias [{}] and policy [{}]", getTestName(), index, alias, policy); - } + dataStream = "ds-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - public void testRollupIndex() throws Exception { - createIndexWithSettings( - client(), + logger.info( + "--> running [{}] with index [{}], data stream [{}], alias [{}] and policy [{}]", + getTestName(), index, + dataStream, alias, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + policy ); - index(client(), index, "_id", "timestamp", "2020-01-01T05:10:00Z", "volume", 11.0); - RollupActionConfig rollupConfig = new RollupActionConfig(DateHistogramInterval.DAY); + } + + private void createIndex(String index, String alias) throws IOException { + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES) + .putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), List.of("metricset")) + .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), "2006-01-08T23:40:53.384Z") + .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), "2106-01-08T23:40:53.384Z") + .put(LifecycleSettings.LIFECYCLE_NAME, policy); + + XContentBuilder builder = XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject("@timestamp") + .field("type", "date") + .endObject() + .startObject("metricset") + .field("type", "keyword") + .field("time_series_dimension", true) + .endObject() + .startObject("volume") + .field("type", "double") + .field("time_series_metric", "gauge") + .endObject() + .endObject() + .endObject(); + String mapping = Strings.toString(builder); + createIndexWithSettings(client(), index, alias, settings, mapping); + } + + public void testRollupIndex() throws Exception { + createIndex(index, alias); + index(client(), index, true, null, "@timestamp", "2020-01-01T05:10:00Z", "volume", 11.0, "metricset", randomAlphaOfLength(5)); - createNewSingletonPolicy(client(), policy, "cold", new RollupILMAction(rollupConfig, null)); + String phaseName = randomFrom("warm", "cold"); + createNewSingletonPolicy(client(), policy, phaseName, new RollupILMAction(ConfigTestHelpers.randomInterval())); updatePolicy(client(), index, policy); - assertBusy(() -> assertNotNull(getRollupIndexName(index))); - String rollupIndex = getRollupIndexName(index); - assertBusy(() -> assertTrue(indexExists(rollupIndex))); - assertBusy(() -> assertFalse(getOnlyIndexSettings(client(), rollupIndex).containsKey(LifecycleSettings.LIFECYCLE_NAME))); - assertBusy(() -> assertTrue(indexExists(index))); + String rollupIndex = waitAndGetRollupIndexName(client(), index); + assertNotNull("Cannot retrieve rollup index name", rollupIndex); + assertBusy(() -> assertTrue("Rollup index does not exist", indexExists(rollupIndex)), 30, TimeUnit.SECONDS); + assertBusy(() -> assertFalse("Source index should have been deleted", indexExists(index)), 30, TimeUnit.SECONDS); + assertBusy( + () -> assertThat(getStepKeyForIndex(client(), rollupIndex), equalTo(PhaseCompleteStep.finalStep(phaseName).getKey())), + 30, + TimeUnit.SECONDS + ); + assertBusy(() -> { + Map settings = getOnlyIndexSettings(client(), rollupIndex); + assertEquals(index, settings.get(IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.getKey())); + assertEquals(RollupTaskStatus.SUCCESS.toString(), settings.get(IndexMetadata.INDEX_ROLLUP_STATUS.getKey())); + }); + assertBusy( + () -> assertTrue("Alias [" + alias + "] does not point to index [" + rollupIndex + "]", aliasExists(rollupIndex, alias)) + ); } - public void testRollupIndexAndSetNewRollupPolicy() throws Exception { - createIndexWithSettings( + public void testRollupIndexInTheHotPhase() throws Exception { + createIndex(index, alias); + index(client(), index, true, null, "@timestamp", "2020-01-01T05:10:00Z", "volume", 11.0, "metricset", randomAlphaOfLength(5)); + + ResponseException e = expectThrows( + ResponseException.class, + () -> createNewSingletonPolicy(client(), policy, "hot", new RollupILMAction(ConfigTestHelpers.randomInterval())) + ); + assertTrue( + e.getMessage().contains("the [rollup] action(s) may not be used in the [hot] phase without an accompanying [rollover] action") + ); + } + + public void testRollupIndexInTheHotPhaseAfterRollover() throws Exception { + String originalIndex = index + "-000001"; + + // add a policy + Map hotActions = Map.of( + RolloverAction.NAME, + new RolloverAction(null, null, null, 1L, null), + RollupILMAction.NAME, + new RollupILMAction(ConfigTestHelpers.randomInterval()) + ); + Map phases = Map.of("hot", new Phase("hot", TimeValue.ZERO, hotActions)); + LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases); + Request createPolicyRequest = new Request("PUT", "_ilm/policy/" + policy); + createPolicyRequest.setJsonEntity("{ \"policy\":" + Strings.toString(lifecyclePolicy) + "}"); + client().performRequest(createPolicyRequest); + + // and a template + Request createTemplateRequest = new Request("PUT", "_template/" + index); + createTemplateRequest.setJsonEntity(""" + { + "index_patterns": ["%s-*"], + "settings": { + "number_of_shards": %s, + "number_of_replicas": 0, + "index.lifecycle.name": "%s", + "index.lifecycle.rollover_alias": "%s" + } + }""".formatted(index, 1, policy, alias)); + createTemplateRequest.setOptions(expectWarnings(RestPutIndexTemplateAction.DEPRECATION_WARNING)); + client().performRequest(createTemplateRequest); + + // then create the index and index a document to trigger rollover + createIndex(originalIndex, alias); + index( client(), - index, - alias, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + originalIndex, + true, + null, + "@timestamp", + "2020-01-01T05:10:00Z", + "volume", + 11.0, + "metricset", + randomAlphaOfLength(5) ); - index(client(), index, "_id", "timestamp", "2020-01-01T05:10:00Z", "volume", 11.0); - RollupActionConfig rollupConfig = new RollupActionConfig(DateHistogramInterval.DAY); - createNewSingletonPolicy(client(), policy, "cold", new RollupILMAction(rollupConfig, policy)); - updatePolicy(client(), index, policy); + String rollupIndex = waitAndGetRollupIndexName(client(), originalIndex); + assertNotNull("Cannot retrieve rollup index name", rollupIndex); + assertBusy(() -> assertTrue("Rollup index does not exist", indexExists(rollupIndex)), 30, TimeUnit.SECONDS); + assertBusy(() -> assertFalse("Source index should have been deleted", indexExists(originalIndex)), 30, TimeUnit.SECONDS); + assertBusy( + () -> assertThat(getStepKeyForIndex(client(), rollupIndex), equalTo(PhaseCompleteStep.finalStep("hot").getKey())), + 30, + TimeUnit.SECONDS + ); + assertBusy(() -> { + Map settings = getOnlyIndexSettings(client(), rollupIndex); + assertEquals(originalIndex, settings.get(IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.getKey())); + assertEquals(RollupTaskStatus.SUCCESS.toString(), settings.get(IndexMetadata.INDEX_ROLLUP_STATUS.getKey())); + }); + } + + public void testTsdbDataStreams() throws Exception { + // Create the ILM policy + createNewSingletonPolicy(client(), policy, "warm", new RollupILMAction(ConfigTestHelpers.randomInterval())); - assertBusy(() -> assertNotNull(getRollupIndexName(index))); - String rollupIndex = getRollupIndexName(index); - assertBusy(() -> assertTrue(indexExists(rollupIndex))); - assertBusy(() -> assertThat(getOnlyIndexSettings(client(), rollupIndex).get(LifecycleSettings.LIFECYCLE_NAME), equalTo(policy))); - assertBusy(() -> assertTrue(indexExists(index))); + // Create a template + Request createIndexTemplateRequest = new Request("POST", "/_index_template/" + dataStream); + createIndexTemplateRequest.setJsonEntity(TEMPLATE.formatted(dataStream, policy)); + assertOK(client().performRequest(createIndexTemplateRequest)); + + String now = DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(Instant.now()); + index(client(), dataStream, true, null, "@timestamp", now, "volume", 11.0, "metricset", randomAlphaOfLength(5)); + + String backingIndexName = DataStream.getDefaultBackingIndexName(dataStream, 1); + assertBusy( + () -> assertThat( + "index must wait in the " + CheckNotDataStreamWriteIndexStep.NAME + " until it is not the write index anymore", + explainIndex(client(), backingIndexName).get("step"), + is(CheckNotDataStreamWriteIndexStep.NAME) + ), + 30, + TimeUnit.SECONDS + ); + + // Manual rollover the original index such that it's not the write index in the data stream anymore + rolloverMaxOneDocCondition(client(), dataStream); + + String rollupIndex = waitAndGetRollupIndexName(client(), backingIndexName); + assertNotNull("Cannot retrieve rollup index name", rollupIndex); + assertBusy(() -> assertTrue("Rollup index does not exist", indexExists(rollupIndex)), 30, TimeUnit.SECONDS); + assertBusy(() -> assertFalse("Source index should have been deleted", indexExists(backingIndexName)), 30, TimeUnit.SECONDS); + assertBusy(() -> { + Map settings = getOnlyIndexSettings(client(), rollupIndex); + assertEquals(backingIndexName, settings.get(IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.getKey())); + assertEquals(RollupTaskStatus.SUCCESS.toString(), settings.get(IndexMetadata.INDEX_ROLLUP_STATUS.getKey())); + }); } /** - * gets the generated rollup index name for a given index by looking at newly created indices that match the rollup index name pattern + * Gets the generated rollup index name for a given index by looking at newly created indices that match the rollup index name pattern * - * @param index the name of the source index used to generate the rollup index name + * @param originalIndexName the name of the source index used to generate the rollup index name * @return the name of the rollup index for a given index, null if none exist - * @throws IOException if request fails */ - private String getRollupIndexName(String index) throws IOException { - Response response = client().performRequest(new Request("GET", "/rollup-*-" + index)); + public String waitAndGetRollupIndexName(RestClient client, String originalIndexName) throws InterruptedException { + final String[] rollupIndexName = new String[1]; + waitUntil(() -> { + try { + rollupIndexName[0] = getRollupIndexName(client, originalIndexName); + return rollupIndexName[0] != null; + } catch (IOException e) { + return false; + } + }, 60, TimeUnit.SECONDS); + logger.info("--> original index name is [{}], rollup index name is [{}]", originalIndexName, rollupIndexName[0]); + return rollupIndexName[0]; + } + + public static String getRollupIndexName(RestClient client, String originalIndexName) throws IOException { + Response response = client.performRequest( + new Request("GET", "/" + RollupILMAction.ROLLUP_INDEX_PREFIX + "*-" + originalIndexName + "/?expand_wildcards=all") + ); Map asMap = responseAsMap(response); if (asMap.size() == 1) { return (String) asMap.keySet().toArray()[0]; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java index 36af383d42588..7f71565b58efd 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java @@ -626,7 +626,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { @Override public void onFailure(Exception deleteException) { - listener.onFailure(new ElasticsearchException("Unable to delete the temporary rollup index [" + rollupIndex + "]", e)); + listener.onFailure(new ElasticsearchException("Unable to delete rollup index [" + rollupIndex + "]", e)); } }); }