Skip to content

Commit

Permalink
TSDB: Downsampling copies all settings from source to rollup index (#…
Browse files Browse the repository at this point in the history
…88565)

This PR modified the rollup action so that all index settings are copied from
the source index to the rollup index.

Relates to #85708
  • Loading branch information
csoulios authored Jul 25, 2022
1 parent 2a65683 commit b3cc4ef
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 44 deletions.
1 change: 1 addition & 0 deletions x-pack/plugin/rollup/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
compileOnly project(path: xpackModule('core'))
compileOnly project(path: xpackModule('analytics'))
compileOnly project(path: xpackModule('mapper-aggregate-metric'))
compileOnly project(path: xpackModule('ilm'))
compileOnly project(':modules:data-streams')
testImplementation(testArtifact(project(xpackModule('core'))))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.Index;
Expand All @@ -61,7 +63,6 @@
import org.elasticsearch.xpack.core.rollup.action.RollupIndexerAction;

import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -79,6 +80,7 @@ public class TransportRollupAction extends AcknowledgedTransportMasterNodeAction
private final Client client;
private final ClusterService clusterService;
private final MetadataCreateIndexService metadataCreateIndexService;
private final IndexScopedSettings indexScopedSettings;

/**
* This is the cluster state task executor for cluster state update actions.
Expand Down Expand Up @@ -107,7 +109,8 @@ public TransportRollupAction(
ThreadPool threadPool,
MetadataCreateIndexService metadataCreateIndexService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings
) {
super(
RollupAction.NAME,
Expand All @@ -122,6 +125,7 @@ public TransportRollupAction(
this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN);
this.clusterService = clusterService;
this.metadataCreateIndexService = metadataCreateIndexService;
this.indexScopedSettings = indexScopedSettings;
}

@Override
Expand Down Expand Up @@ -242,19 +246,20 @@ protected void masterOperation(
client.execute(RollupIndexerAction.INSTANCE, rollupIndexerRequest, ActionListener.wrap(indexerResp -> {
if (indexerResp.isCreated()) {
// 4. Make rollup index read-only and set the correct number of replicas
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_BLOCKS_WRITE, true)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas())
.build();
UpdateSettingsRequest updateSettingsReq = new UpdateSettingsRequest(settings, rollupIndexName);
final Settings.Builder settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true);
// Number of replicas had been previously set to 0 to speed up index population
if (sourceIndexMetadata.getNumberOfReplicas() > 0) {
settings.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas());
}
UpdateSettingsRequest updateSettingsReq = new UpdateSettingsRequest(settings.build(), rollupIndexName);
updateSettingsReq.setParentTask(parentTask);
client.admin().indices().updateSettings(updateSettingsReq, ActionListener.wrap(updateSettingsResponse -> {
if (updateSettingsResponse.isAcknowledged()) {
// 5. Refresh rollup index
refreshIndex(rollupIndexName, parentTask, ActionListener.wrap(refreshIndexResponse -> {
if (refreshIndexResponse.getFailedShards() == 0) {
// 6. Mark rollup index as "completed successfully"
updateRollupMetadata(sourceIndexName, rollupIndexName, request, ActionListener.wrap(resp -> {
updateRollupMetadata(rollupIndexName, request, ActionListener.wrap(resp -> {
if (resp.isAcknowledged()) {
// 7. Force-merge the rollup index to a single segment
forceMergeIndex(
Expand Down Expand Up @@ -427,42 +432,42 @@ public static String createRollupIndexMapping(
}

/**
* Copy index metadata from the source index to the rollup index.
* Copy index settings from the source index to the rollup index. Settings that
* have already been set in the rollup index will not be overridden.
*/
private IndexMetadata.Builder copyIndexMetadata(IndexMetadata sourceIndexMetadata, IndexMetadata rollupIndexMetadata) {
String sourceIndexName = sourceIndexMetadata.getIndex().getName();
// Copy index settings from the source index, but do not override the settings
// that already have been set in the rollup index
final Settings.Builder targetSettings = Settings.builder().put(rollupIndexMetadata.getSettings());
for (final String key : sourceIndexMetadata.getSettings().keySet()) {
final Setting<?> setting = indexScopedSettings.get(key);
if (setting == null) {
assert indexScopedSettings.isPrivateSetting(key) : "expected [" + key + "] to be private but it was not";
} else if (setting.getProperties().contains(Setting.Property.NotCopyableOnResize)) {
// we leverage the NotCopyableOnResize setting property for rollup, because
// the same rules with resize apply
continue;
}
// do not override settings that have already been set in the rollup index
if (targetSettings.keys().contains(key)) {
continue;
}
targetSettings.copy(key, sourceIndexMetadata.getSettings());
}

/*
* Add the source index name and UUID to the rollup index metadata.
* If the source index is a rollup index, we will add the name and UUID
* of the first index that we initially rolled up.
*/
String originalIndexName = IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.exists(sourceIndexMetadata.getSettings())
? IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.get(sourceIndexMetadata.getSettings())
: sourceIndexName;
String originalIndexUuid = IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.exists(sourceIndexMetadata.getSettings())
? IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.get(sourceIndexMetadata.getSettings())
: sourceIndexMetadata.getIndexUUID();

// Copy time series index settings from original index
List<String> indexRoutingPath = sourceIndexMetadata.getRoutingPaths();
Instant startTime = IndexSettings.TIME_SERIES_START_TIME.get(sourceIndexMetadata.getSettings());
Instant endTime = IndexSettings.TIME_SERIES_END_TIME.get(sourceIndexMetadata.getSettings());
IndexMode indexMode = IndexSettings.MODE.get(sourceIndexMetadata.getSettings());

return IndexMetadata.builder(rollupIndexMetadata)
.settings(
Settings.builder()
.put(rollupIndexMetadata.getSettings())
.put(IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.getKey(), originalIndexName)
.put(IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.getKey(), originalIndexUuid)
.put(IndexMetadata.INDEX_HIDDEN_SETTING.getKey(), sourceIndexMetadata.isHidden())
// Add the time series index settings
.put(IndexSettings.MODE.getKey(), indexMode)
.putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), indexRoutingPath)
.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), startTime.toString())
.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), endTime.toString())
);
if (IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.exists(sourceIndexMetadata.getSettings()) == false
|| IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.exists(sourceIndexMetadata.getSettings()) == false) {
Index sourceIndex = sourceIndexMetadata.getIndex();
targetSettings.put(IndexMetadata.INDEX_ROLLUP_SOURCE_NAME.getKey(), sourceIndex.getName())
.put(IndexMetadata.INDEX_ROLLUP_SOURCE_UUID.getKey(), sourceIndex.getUUID());
}

return IndexMetadata.builder(rollupIndexMetadata).settings(targetSettings);
}

/**
Expand Down Expand Up @@ -524,12 +529,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}, ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()), STATE_UPDATE_TASK_EXECUTOR);
}

private void updateRollupMetadata(
String sourceIndexName,
String rollupIndexName,
RollupAction.Request request,
ActionListener<AcknowledgedResponse> listener
) {
private void updateRollupMetadata(String rollupIndexName, RollupAction.Request request, ActionListener<AcknowledgedResponse> listener) {
// 6. Mark rollup index as "completed successfully" ("index.rollup.status": "success")
clusterService.submitStateUpdateTask(
"update-rollup-metadata [" + rollupIndexName + "]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand Down Expand Up @@ -58,10 +59,13 @@
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
import org.elasticsearch.xpack.core.rollup.action.RollupAction;
import org.elasticsearch.xpack.core.rollup.action.RollupActionRequestValidationException;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import org.elasticsearch.xpack.rollup.Rollup;
import org.junit.Before;

Expand Down Expand Up @@ -104,7 +108,8 @@ protected Collection<Class<? extends Plugin>> getPlugins() {
Rollup.class,
AnalyticsPlugin.class,
AggregateMetricMapperPlugin.class,
DataStreamsPlugin.class
DataStreamsPlugin.class,
IndexLifecycle.class
);
}

Expand All @@ -115,7 +120,7 @@ public void setup() {
startTime = randomLongBetween(946769284000L, 1607470084000L); // random date between 2000-2020
docCount = randomIntBetween(10, 9000);
numOfShards = randomIntBetween(1, 4);
numOfReplicas = 0; // Since this is a single node, we cannot have replicas
numOfReplicas = randomIntBetween(0, 3);

// Values for keyword dimensions
dimensionValues = new ArrayList<>(MAX_DIM_VALUES);
Expand Down Expand Up @@ -170,6 +175,38 @@ public void testRollupIndex() throws IOException {
assertRollupIndex(sourceIndex, rollupIndex, config);
}

public void testCopyIndexSettings() throws IOException {
Settings settings = Settings.builder()
.put(LifecycleSettings.LIFECYCLE_NAME, randomAlphaOfLength(5))
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.getKey(), randomAlphaOfLength(5))
.put(LifecycleSettings.LIFECYCLE_PARSE_ORIGINATION_DATE_SETTING.getKey(), randomBoolean())
.build();
logger.info("Updating index [{}] with settings [{}]", sourceIndex, settings);

var updateSettingsReq = new UpdateSettingsRequest(settings, sourceIndex);
var r = client().admin().indices().updateSettings(updateSettingsReq).actionGet();
assertTrue("Update settings not acked", r.isAcknowledged());

RollupActionConfig config = new RollupActionConfig(randomInterval());
SourceSupplier sourceSupplier = () -> {
String ts = randomDateForInterval(config.getInterval());
return XContentFactory.jsonBuilder()
.startObject()
.field(FIELD_TIMESTAMP, ts)
.field(FIELD_DIMENSION_1, randomFrom(dimensionValues))
.field(FIELD_NUMERIC_1, randomInt())
.endObject();
};
bulkIndex(sourceSupplier);
prepareSourceIndex(sourceIndex);
rollup(sourceIndex, rollupIndex, config);

GetIndexResponse indexSettingsResp = client().admin().indices().prepareGetIndex().addIndices(rollupIndex).get();
for (String key : settings.keySet()) {
assertEquals(settings.get(key), indexSettingsResp.getSetting(rollupIndex, key));
}
}

public void testNullSourceIndexName() {
RollupActionConfig config = new RollupActionConfig(randomInterval());
ActionRequestValidationException exception = expectThrows(
Expand Down

0 comments on commit b3cc4ef

Please sign in to comment.