Skip to content

Commit

Permalink
Add Rollup ILM Action (#65633) (#68184)
Browse files Browse the repository at this point in the history
this commit introduces a new Rollup ILM Action that allows indices
to be rolled up according to a specific rollup config. The
action also allows for the new rolled up index to be associated with
a different policy than the original/source index.
  • Loading branch information
talevy authored Jan 29, 2021
1 parent 78df746 commit e8664de
Show file tree
Hide file tree
Showing 21 changed files with 881 additions and 42 deletions.
56 changes: 56 additions & 0 deletions docs/reference/ilm/actions/ilm-rollup.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[role="xpack"]
[[ilm-rollup]]
=== Rollup

Phases allowed: hot, cold.

Aggregates an index's time series data and stores the results in a new read-only
index. For example, you can roll up hourly data into daily or weekly summaries.

For more information about rollup, see the <<rollup-api, rollup action documentation>>

The name of the rolled up index will be the original index name of the managed index prefixed
with `rollup-`.

[[ilm-rollup-options]]
==== Rollup options
`config`::
(Required, integer)
The rollup configuration, a more detailed description of the
rollup configuration specification can be found <<rollup-api-request-body,here>>.

`rollup_policy`::
(Optional, string)
The name of an <<index-lifecycle-management, {ilm}>> ({ilm-init}) policy to associate
with the newly created rollup index.

[[ilm-rollup-ex]]
==== Example

[source,console]
--------------------------------------------------
PUT _ilm/policy/my_policy
{
"policy": {
"phases": {
"cold": {
"actions": {
"rollup" : {
"config": {
"groups": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1y"
}
},
"metrics": [
{ "field": "temperature", "metrics": [ "avg" ] }
]
}
}
}
}
}
}
}
--------------------------------------------------
26 changes: 19 additions & 7 deletions docs/reference/ilm/ilm-actions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<<ilm-allocate,Allocate>>::
Move shards to nodes with different performance characteristics
and reduce the number of replicas.
and reduce the number of replicas.

<<ilm-delete,Delete>>::
Permanently remove the index.
Expand All @@ -22,10 +22,10 @@ Move the index shards to the <<data-tiers, data tier>> that corresponds
to the current {ilm-init} phase.

<<ilm-readonly,Read only>>::
Block write operations to the index.
Block write operations to the index.

<<ilm-rollover,Rollover>>::
Remove the index as the write index for the rollover alias and
Remove the index as the write index for the rollover alias and
start indexing to a new index.

<<ilm-searchable-snapshot, Searchable snapshot>>::
Expand All @@ -35,17 +35,25 @@ and mount it as a searchable snapshot.

<<ilm-set-priority,Set priority>>::
Lower the priority of an index as it moves through the lifecycle
to ensure that hot indices are recovered first.
to ensure that hot indices are recovered first.

<<ilm-shrink,Shrink>>::
Reduce the number of primary shards by shrinking the index into a new index.

<<ilm-unfollow,Unfollow>>::
Convert a follower index to a regular index.
Performed automatically before a rollover, shrink, or searchable snapshot action.
Performed automatically before a rollover, shrink, or searchable snapshot action.

<<ilm-wait-for-snapshot,Wait for snapshot>>::
Ensure that a snapshot exists before deleting the index.
Ensure that a snapshot exists before deleting the index.

ifdef::permanently-unreleased-branch[]

<<ilm-rollup,Rollup>>::
Aggregates an index's time series data and stores the results in a new read-only
index. For example, you can roll up hourly data into daily or weekly summaries.

endif::[]

include::actions/ilm-allocate.asciidoc[]
include::actions/ilm-delete.asciidoc[]
Expand All @@ -59,3 +67,7 @@ include::actions/ilm-set-priority.asciidoc[]
include::actions/ilm-shrink.asciidoc[]
include::actions/ilm-unfollow.asciidoc[]
include::actions/ilm-wait-for-snapshot.asciidoc[]

ifdef::permanently-unreleased-branch[]
include::actions/ilm-rollup.asciidoc[]
endif::[]
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.elasticsearch.xpack.core.ilm.MigrateAction;
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.RollupILMAction;
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
Expand Down Expand Up @@ -345,7 +346,7 @@ static Settings additionalSettings(final Settings settings, final boolean enable

@Override
public List<ActionType<? extends ActionResponse>> getClientActions() {
List<ActionType<? extends ActionResponse>> actions = new ArrayList(Arrays.asList(
List<ActionType<? extends ActionResponse>> actions = new ArrayList<>(Arrays.asList(
// deprecation
DeprecationInfoAction.INSTANCE,
// graph
Expand Down Expand Up @@ -521,7 +522,7 @@ public List<ActionType<? extends ActionResponse>> getClientActions() {

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Stream.concat(
List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>(Stream.concat(
Arrays.asList(
// graph
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.GRAPH, GraphFeatureSetUsage::new),
Expand Down Expand Up @@ -686,7 +687,13 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.RUNTIME_FIELDS, RuntimeFieldsFeatureSetUsage::new)
).stream(),
MlEvaluationNamedXContentProvider.getNamedWriteables().stream()
).collect(toList());
).collect(toList()));

if (RollupV2.isEnabled()) {
namedWriteables.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new));
}

return namedWriteables;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.rollup.RollupActionConfig;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

/**
* A {@link LifecycleAction} which calls {@link org.elasticsearch.xpack.core.rollup.action.RollupAction} on an index
*/
public class RollupILMAction implements LifecycleAction {
public static final String NAME = "rollup";

private static final ParseField CONFIG_FIELD = new ParseField("config");
private static final ParseField POLICY_FIELD = new ParseField("rollup_policy");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<RollupILMAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new RollupILMAction((RollupActionConfig) a[0], (String) a[1]));

private final RollupActionConfig config;
private final String rollupPolicy;

static {
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> RollupActionConfig.fromXContent(p), CONFIG_FIELD, ObjectParser.ValueType.OBJECT);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), POLICY_FIELD);
}

public static RollupILMAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

public RollupILMAction(RollupActionConfig config, @Nullable String rollupPolicy) {
this.config = config;
this.rollupPolicy = rollupPolicy;
}

public RollupILMAction(StreamInput in) throws IOException {
this(new RollupActionConfig(in), in.readOptionalString());
}

@Override
public String getWriteableName() {
return NAME;
}

RollupActionConfig config() {
return config;
}

String rollupPolicy() {
return rollupPolicy;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(CONFIG_FIELD.getPreferredName(), config);
if (rollupPolicy != null) {
builder.field(POLICY_FIELD.getPreferredName(), rollupPolicy);
}
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
config.writeTo(out);
out.writeOptionalString(rollupPolicy);
}

@Override
public boolean isSafeAction() {
return false;
}

@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey checkNotWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyStep.NAME);
StepKey rollupKey = new StepKey(phase, NAME, NAME);
CheckNotDataStreamWriteIndexStep checkNotWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNotWriteIndex,
readOnlyKey);
ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, rollupKey, client);
if (rollupPolicy == null) {
Step rollupStep = new RollupStep(rollupKey, nextStepKey, client, config);
return Arrays.asList(checkNotWriteIndexStep, readOnlyStep, rollupStep);
} else {
StepKey updateRollupIndexPolicyStepKey = new StepKey(phase, NAME, UpdateRollupIndexPolicyStep.NAME);
Step rollupStep = new RollupStep(rollupKey, updateRollupIndexPolicyStepKey, client, config);
Step updateRollupIndexPolicyStep = new UpdateRollupIndexPolicyStep(updateRollupIndexPolicyStepKey, nextStepKey,
client, rollupPolicy);
return Arrays.asList(checkNotWriteIndexStep, readOnlyStep, rollupStep, updateRollupIndexPolicyStep);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

RollupILMAction that = (RollupILMAction) o;

return Objects.equals(this.config, that.config)
&& Objects.equals(this.rollupPolicy, that.rollupPolicy);
}

@Override
public int hashCode() {
return Objects.hash(config, rollupPolicy);
}

@Override
public String toString() {
return Strings.toString(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
import org.elasticsearch.xpack.core.rollup.action.RollupAction;

import java.util.Objects;

/**
* Rolls up index using a {@link RollupActionConfig}
*/
public class RollupStep extends AsyncActionStep {
public static final String NAME = "rollup";
public static final String ROLLUP_INDEX_NAME_PREFIX = "rollup-";

private final RollupActionConfig config;

public RollupStep(StepKey key, StepKey nextStepKey, Client client, RollupActionConfig config) {
super(key, nextStepKey, client);
this.config = config;
}

public static String getRollupIndexName(String index) {
return ROLLUP_INDEX_NAME_PREFIX + index;
}

@Override
public boolean isRetryable() {
return true;
}

@Override
public void performAction(IndexMetadata indexMetadata, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
String originalIndex = indexMetadata.getIndex().getName();
RollupAction.Request request = new RollupAction.Request(originalIndex, getRollupIndexName(originalIndex), config);
// currently RollupAction always acknowledges action was complete when no exceptions are thrown.
getClient().execute(RollupAction.INSTANCE, request,
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
}

public RollupActionConfig getConfig() {
return config;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), config);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
RollupStep other = (RollupStep) obj;
return super.equals(obj)
&& Objects.equals(config, other.config);
}
}
Loading

0 comments on commit e8664de

Please sign in to comment.