Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSDB: Downsampling copies all settings from source to rollup index #88565

Merged
merged 5 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugin/rollup/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
compileOnly project(path: xpackModule('core'))
compileOnly project(path: xpackModule('analytics'))
compileOnly project(path: xpackModule('mapper-aggregate-metric'))
compileOnly project(path: xpackModule('ilm'))
compileOnly project(':modules:data-streams')
testImplementation(testArtifact(project(xpackModule('core'))))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.Index;
Expand All @@ -61,7 +63,6 @@
import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction;

import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -79,6 +80,7 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
private final Client client;
private final ClusterService clusterService;
private final MetadataCreateIndexService metadataCreateIndexService;
private final IndexScopedSettings indexScopedSettings;

/**
* This is the cluster state task executor for cluster state update actions.
Expand Down Expand Up @@ -107,7 +109,8 @@ public TransportRollupAction(
ThreadPool threadPool,
MetadataCreateIndexService metadataCreateIndexService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings
) {
super(
RollupAction.NAME,
Expand All @@ -122,6 +125,7 @@ public TransportRollupAction(
this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN);
this.clusterService = clusterService;
this.metadataCreateIndexService = metadataCreateIndexService;
this.indexScopedSettings = indexScopedSettings;
}

@Override
Expand Down Expand Up @@ -242,19 +246,20 @@ protected void masterOperation(
client.execute(RollupIndexerAction.INSTANCE, rollupIndexerRequest, ActionListener.wrap(indexerResp -> {
if (indexerResp.isCreated()) {
// 4. Make rollup index read-only and set the correct number of replicas
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_BLOCKS_WRITE, true)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas())
.build();
UpdateSettingsRequest updateSettingsReq = new UpdateSettingsRequest(settings, rollupIndexName);
final Settings.Builder settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true);
// Number of replicas had been previously set to 0 to speed up index population
if (sourceIndexMetadata.getNumberOfReplicas() > 0) {
settings.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas());
}
Comment on lines +250 to +253
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this piece, why do we copy replicas only if they are greater than 0? How would this behave if someone had something like index.auto_expand_replicas: 0-2 where the number of replicas is variable? Shouldn't we be consistent and maintain the number of replicas as defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind this is that when we create the rollup index we explicitly set that num_of_replicas=0. We do so, because we don't want the index to replicate while we populate the index.

When all documents have been written in the index, we set the num_of_replicas equal to the num_of_replicas of the source index. Of course, if num_of_replicas of the source index is 0, we don't have to do anything, hence the conditional.

UpdateSettingsRequest updateSettingsReq = new UpdateSettingsRequest(settings.build(), rollupIndexName);
updateSettingsReq.setParentTask(parentTask);
client.admin().indices().updateSettings(updateSettingsReq, ActionListener.wrap(updateSettingsResponse -> {
if (updateSettingsResponse.isAcknowledged()) {
// 5. Refresh rollup index
refreshIndex(rollupIndexName, parentTask, ActionListener.wrap(refreshIndexResponse -> {
if (refreshIndexResponse.getFailedShards() == 0) {
// 6. Mark rollup index as "completed successfully"
updateRollupMetadata(sourceIndexName, rollupIndexName, request, ActionListener.wrap(resp -> {
updateRollupMetadata(rollupIndexName, request, ActionListener.wrap(resp -> {
if (resp.isAcknowledged()) {
// 7. Force-merge the rollup index to a single segment
forceMergeIndex(
Expand Down Expand Up @@ -427,42 +432,42 @@ public static String createRollupIndexMapping(
}

/**
* Copy index metadata from the source index to the rollup index.
* Copy index settings from the source index to the rollup index. Settings that
* have already been set in the rollup index will not be overridden.
*/
private IndexMetadata.Builder copyIndexMetadata(IndexMetadata sourceIndexMetadata, IndexMetadata rollupIndexMetadata) {
String sourceIndexName = sourceIndexMetadata.getIndex().getName();
// Copy index settings from the source index, but do not override the settings
// that already have been set in the rollup index
final Settings.Builder targetSettings = Settings.builder().put(rollupIndexMetadata.getSettings());
for (final String key : sourceIndexMetadata.getSettings().keySet()) {
final Setting<?> setting = indexScopedSettings.get(key);
if (setting == null) {
assert indexScopedSettings.isPrivateSetting(key) : "expected [" + key + "] to be private but it was not";
} else if (setting.getProperties().contains(Setting.Property.NotCopyableOnResize)) {
// we leverage the NotCopyableOnResize setting property for rollup, because
// the same rules with resize apply
continue;
}
// do not override settings that have already been set in the rollup index
if (targetSettings.keys().contains(key)) {
continue;
}
targetSettings.copy(key, sourceIndexMetadata.getSettings());
}

/*
* Add the source index name and UUID to the rollup index metadata.
* If the source index is a rollup index, we will add the name and UUID
* of the first index that we initially rolled up.
*/
String originalIndexName = IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.exists(sourceIndexMetadata.getSettings())
? IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.get(sourceIndexMetadata.getSettings())
: sourceIndexName;
String originalIndexUuid = IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.exists(sourceIndexMetadata.getSettings())
? IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.get(sourceIndexMetadata.getSettings())
: sourceIndexMetadata.getIndexUUID();

// Copy time series index settings from original index
List<String> indexRoutingPath = sourceIndexMetadata.getRoutingPaths();
Instant startTime = IndexSettings.TIME_SERIES_START_TIME.get(sourceIndexMetadata.getSettings());
Instant endTime = IndexSettings.TIME_SERIES_END_TIME.get(sourceIndexMetadata.getSettings());
IndexMode indexMode = IndexSettings.MODE.get(sourceIndexMetadata.getSettings());

return IndexMetadata.builder(rollupIndexMetadata)
.settings(
Settings.builder()
.put(rollupIndexMetadata.getSettings())
.put(IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.getKey(), originalIndexName)
.put(IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.getKey(), originalIndexUuid)
.put(IndexMetadata.INDEX_HIDDEN_SETTING.getKey(), sourceIndexMetadata.isHidden())
// Add the time series index settings
.put(IndexSettings.MODE.getKey(), indexMode)
.putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), indexRoutingPath)
.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), startTime.toString())
.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), endTime.toString())
);
if (IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.exists(sourceIndexMetadata.getSettings()) == false
|| IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.exists(sourceIndexMetadata.getSettings()) == false) {
Index sourceIndex = sourceIndexMetadata.getIndex();
targetSettings.put(IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.getKey(), sourceIndex.getName())
.put(IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.getKey(), sourceIndex.getUUID());
Comment on lines +466 to +467
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we already have a test for doing a rollup-of-a-rollup where the original index is maintained in the rollup source/uuid? If not, could you add one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now rollup-of-rollups does not fully work. I plan to add support for rollup-of-rollup in a following PR.
I will definitely add tests in that PR

This part is only plumbing that is going to be needed for rollups of rollups

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now rollup-of-rollups does not fully work.

As some subsequent work (not in this PR), does it make sense to reject the request if someone tries to rollup an index that's already been rolled up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In data streams this cannot happen because no two indices for the same period can co-exist.

When it comes to concrete indices, there are very valid use cases for allowing multiple rollup indices of the same source index. For example having multiple rollup intervals (hourlies, dailies etc) or having multiple timezones (right now we support only UTC, but tin the future we should support more timezones).

}

return IndexMetadata.builder(rollupIndexMetadata).settings(targetSettings);
}

/**
Expand Down Expand Up @@ -524,12 +529,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}, ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()), STATE_UPDATE_TASK_EXECUTOR);
}

private void updateRollupMetadata(
String sourceIndexName,
String rollupIndexName,
RollupAction.Request request,
ActionListener<AcknowledgedResponse> listener
) {
private void updateRollupMetadata(String rollupIndexName, RollupAction.Request request, ActionListener<AcknowledgedResponse> listener) {
// 6. Mark rollup index as "completed successfully" ("index.rollup.status": "success")
clusterService.submitStateUpdateTask(
"update-rollup-metadata [" + rollupIndexName + "]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand Down Expand Up @@ -58,10 +59,13 @@
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
import org.elasticsearch.xpack.core.rollup.action.RollupAction;
import org.elasticsearch.xpack.core.rollup.action.RollupActionRequestValidationException;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import org.elasticsearch.xpack.rollup.Rollup;
import org.junit.Before;

Expand Down Expand Up @@ -104,7 +108,8 @@ protected Collection<Class<? extends Plugin>> getPlugins() {
Rollup.class,
AnalyticsPlugin.class,
AggregateMetricMapperPlugin.class,
DataStreamsPlugin.class
DataStreamsPlugin.class,
IndexLifecycle.class
);
}

Expand All @@ -115,7 +120,7 @@ public void setup() {
startTime = randomLongBetween(946769284000L, 1607470084000L); // random date between 2000-2020
docCount = randomIntBetween(10, 9000);
numOfShards = randomIntBetween(1, 4);
numOfReplicas = 0; // Since this is a single node, we cannot have replicas
numOfReplicas = randomIntBetween(0, 3);

// Values for keyword dimensions
dimensionValues = new ArrayList<>(MAX_DIM_VALUES);
Expand Down Expand Up @@ -170,6 +175,38 @@ public void testRollupIndex() throws IOException {
assertRollupIndex(sourceIndex, rollupIndex, config);
}

public void testCopyIndexSettings() throws IOException {
Settings settings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_NAME, randomAlphaOfLength(5))
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.getKey(), randomAlphaOfLength(5))
.put(LifecycleSettings.LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING.getKey(), randomBoolean())
.build();
logger.info("Updating index [{}] with settings [{}]", sourceIndex, settings);

var updateSettingsReq = new UpdateSettingsRequest(settings, sourceIndex);
var r = client().admin().indices().updateSettings(updateSettingsReq).actionGet();
assertTrue("Update settings not acked", r.isAcknowledged());

RollupActionConfig config = new RollupActionConfig(randomInterval());
SourceSupplier sourceSupplier = () -> {
String ts = randomDateForInterval(config.getInterval());
return XContentFactory.jsonBuilder()
.startObject()
.field(FIELD_TIMESTAMP, ts)
.field(FIELD_DIMENSION_1, randomFrom(dimensionValues))
.field(FIELD_NUMERIC_1, randomInt())
.endObject();
};
bulkIndex(sourceSupplier);
prepareSourceIndex(sourceIndex);
rollup(sourceIndex, rollupIndex, config);

GetIndexResponse indexSettingsResp = client().admin().indices().prepareGetIndex().addIndices(rollupIndex).get();
for (String key : settings.keySet()) {
assertEquals(settings.get(key), indexSettingsResp.getSetting(rollupIndex, key));
}
}

public void testNullSourceIndexName() {
RollupActionConfig config = new RollupActionConfig(randomInterval());
ActionRequestValidationException exception = expectThrows(
Expand Down