Skip to content

Commit

Permalink
Add system data stream support for DSL downsampling
Browse files Browse the repository at this point in the history
  • Loading branch information
andreidan committed Sep 15, 2023
1 parent 37a2a37 commit 2500acf
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ public Tuple<ClusterState, Void> executeTask(DeleteSourceAndAddDownsampleToDS ta
@Override
public void taskSucceeded(DeleteSourceAndAddDownsampleToDS task, Void unused) {
LOGGER.trace(
"Updated cluster state and replaced index [{}] with index [{}] in data stream [{}]",
"Updated cluster state and replaced index [{}] with index [{}] in data stream [{}]. Index [{}] was deleted",
task.getSourceBackingIndex(),
task.getDownsampleIndex(),
task.getDataStreamName()
task.getDataStreamName(),
task.getSourceBackingIndex()
);
task.getListener().onResponse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public DeleteSourceAndAddDownsampleToDS(

ClusterState execute(ClusterState state) {
LOGGER.trace(
"Updating cluster state to replace index [{}] with [{}] in data stream [{}]",
"Updating cluster state to replace and delete index [{}] with [{}] in data stream [{}]",
sourceBackingIndex,
downsampleIndex,
dataStreamName
Expand All @@ -67,7 +67,7 @@ ClusterState execute(ClusterState state) {
if (downsampleIndexMeta == null) {
// the downsample index doesn't exist anymore so nothing to replace here
LOGGER.trace(
"Received request replace index [{}] with [{}] in data stream [{}] but the replacement index [{}] doesn't exist."
"Received request to replace index [{}] with [{}] in data stream [{}] but the replacement index [{}] doesn't exist."
+ "Nothing to do here.",
sourceBackingIndex,
downsampleIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,6 @@ static ClusterState updateDataLifecycle(
Metadata.Builder builder = Metadata.builder(metadata);
for (var dataStreamName : dataStreamNames) {
var dataStream = validateDataStream(metadata, dataStreamName);
if (dataStream.isSystem()) {
if (lifecycle != null && lifecycle.getDownsamplingRounds() != null) {
throw new IllegalArgumentException(
"System data streams do not support downsampling as part of their lifecycle configuration. Encountered ["
+ dataStream.getName()
+ "] in the request"
);
}
}
builder.put(
new DataStream(
dataStream.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -89,7 +87,6 @@ public SystemDataStreamDescriptor(
this.type = Objects.requireNonNull(type, "type must be specified");
this.composableIndexTemplate = Objects.requireNonNull(composableIndexTemplate, "composableIndexTemplate must be provided");
this.componentTemplates = componentTemplates == null ? Map.of() : Map.copyOf(componentTemplates);
validateNoDownsamplingConfigured(composableIndexTemplate, componentTemplates);
this.allowedElasticProductOrigins = Objects.requireNonNull(
allowedElasticProductOrigins,
"allowedElasticProductOrigins must not be null"
Expand All @@ -102,16 +99,6 @@ public SystemDataStreamDescriptor(
this.characterRunAutomaton = new CharacterRunAutomaton(buildAutomaton(backingIndexPatternForDataStream(this.dataStreamName)));
}

private void validateNoDownsamplingConfigured(
ComposableIndexTemplate composableIndexTemplate,
Map<String, ComponentTemplate> componentTemplates
) {
DataStreamLifecycle resolvedLifecycle = MetadataIndexTemplateService.resolveLifecycle(composableIndexTemplate, componentTemplates);
if (resolvedLifecycle != null && resolvedLifecycle.isEnabled() && resolvedLifecycle.getDownsamplingRounds() != null) {
throw new IllegalArgumentException("System data streams do not support downsampling as part of their lifecycle configuration");
}
}

public String getDataStreamName() {
return dataStreamName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ public class InternalUsers {
ForceMergeAction.NAME + "*",
// indices stats is used by rollover, so we need to grant it here
IndicesStatsAction.NAME + "*",
UpdateSettingsAction.NAME
// Down-sampling related actions are not granted here because down-sampling is not supported for system data streams
UpdateSettingsAction.NAME,
DownsampleAction.NAME,
AddIndexBlockAction.NAME
)
.allowRestrictedIndices(true)
.build() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ public void testDataStreamLifecycleUser() {
DeleteIndexAction.NAME,
ForceMergeAction.NAME,
IndicesStatsAction.NAME,
UpdateSettingsAction.NAME
UpdateSettingsAction.NAME,
DownsampleAction.NAME,
AddIndexBlockAction.NAME
);
final String dataStream = randomAlphaOfLengthBetween(3, 12);
checkIndexAccess(role, randomFrom(sampleIndexActions), dataStream, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamLifecycleAction;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -95,11 +94,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(
LocalStateSecurity.class,
DataStreamsPlugin.class,
SystemDataStreamTestPlugin.class,
MapperExtrasPlugin.class,
Wildcard.class,
Downsample.class,
AggregateMetricMapperPlugin.class
AggregateMetricMapperPlugin.class,
SystemDataStreamWithDownsamplingConfigurationPlugin.class
);
}

Expand Down Expand Up @@ -135,70 +134,27 @@ public void testDownsamplingAuthorized() throws Exception {
waitAndAssertDownsamplingCompleted(dataStreamName);
}

public void testConfiguringLifecycleWithDownsamplingForSystemDataStreamFails() {
String dataStreamName = SystemDataStreamTestPlugin.SYSTEM_DATA_STREAM_NAME;
indexDocuments(client(), dataStreamName, 100);
DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder()
.downsampling(
new DataStreamLifecycle.Downsampling(
List.of(
new DataStreamLifecycle.Downsampling.Round(
TimeValue.timeValueMillis(0),
new DownsampleConfig(new DateHistogramInterval("5m"))
),
new DataStreamLifecycle.Downsampling.Round(
TimeValue.timeValueSeconds(10),
new DownsampleConfig(new DateHistogramInterval("10m"))
)
)
)
)
.build();
IllegalArgumentException illegalArgumentException = expectThrows(
IllegalArgumentException.class,
() -> client().execute(
PutDataStreamLifecycleAction.INSTANCE,
new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, lifecycle)
).actionGet()
);
assertThat(
illegalArgumentException.getMessage(),
is(
"System data streams do not support downsampling as part of their lifecycle "
+ "configuration. Encountered ["
+ dataStreamName
+ "] in the request"
)
);
}

public void testExplicitSystemDataStreamConfigurationWithDownsamplingFails() {
SystemDataStreamWithDownsamplingConfigurationPlugin pluginWithIllegalSystemDataStream =
new SystemDataStreamWithDownsamplingConfigurationPlugin();
IllegalArgumentException illegalArgumentException = expectThrows(
IllegalArgumentException.class,
() -> pluginWithIllegalSystemDataStream.getSystemDataStreamDescriptors()
);
assertThat(
illegalArgumentException.getMessage(),
is("System data streams do not support downsampling as part of their lifecycle configuration")
);
@TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging")
public void testSystemDataStreamConfigurationWithDownsampling() throws Exception {
String dataStreamName = SystemDataStreamWithDownsamplingConfigurationPlugin.SYSTEM_DATA_STREAM_NAME;
indexDocuments(client(), dataStreamName, 10_000);
waitAndAssertDownsamplingCompleted(dataStreamName);
}

private void waitAndAssertDownsamplingCompleted(String dataStreamName) throws Exception {
List<Index> backingIndices = getDataStreamBackingIndices(dataStreamName);
String firstGenerationBackingIndex = backingIndices.get(0).getName();
String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex;
String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex;
String firstRoundDownsamplingIndex = "downsample-5m-" + firstGenerationBackingIndex;
String secondRoundDownsamplingIndex = "downsample-10m-" + firstGenerationBackingIndex;

Set<String> witnessedDownsamplingIndices = new HashSet<>();
clusterService().addListener(event -> {
if (event.indicesCreated().contains(oneSecondDownsampleIndex)
|| event.indicesDeleted().stream().anyMatch(index -> index.getName().equals(oneSecondDownsampleIndex))) {
witnessedDownsamplingIndices.add(oneSecondDownsampleIndex);
if (event.indicesCreated().contains(firstRoundDownsamplingIndex)
|| event.indicesDeleted().stream().anyMatch(index -> index.getName().equals(firstRoundDownsamplingIndex))) {
witnessedDownsamplingIndices.add(firstRoundDownsamplingIndex);
}
if (event.indicesCreated().contains(tenSecondsDownsampleIndex)) {
witnessedDownsamplingIndices.add(tenSecondsDownsampleIndex);
if (event.indicesCreated().contains(secondRoundDownsamplingIndex)) {
witnessedDownsamplingIndices.add(secondRoundDownsamplingIndex);
}
});

Expand All @@ -207,15 +163,15 @@ private void waitAndAssertDownsamplingCompleted(String dataStreamName) throws Ex
assertBusy(() -> {
assertNoAuthzErrors();
// first downsampling round
assertThat(witnessedDownsamplingIndices.contains(oneSecondDownsampleIndex), is(true));
assertThat(witnessedDownsamplingIndices.contains(firstRoundDownsamplingIndex), is(true));
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
assertNoAuthzErrors();
assertThat(witnessedDownsamplingIndices.size(), is(2));
assertThat(witnessedDownsamplingIndices.contains(oneSecondDownsampleIndex), is(true));
assertThat(witnessedDownsamplingIndices.contains(firstRoundDownsamplingIndex), is(true));

assertThat(witnessedDownsamplingIndices.contains(tenSecondsDownsampleIndex), is(true));
assertThat(witnessedDownsamplingIndices.contains(secondRoundDownsamplingIndex), is(true));
}, 30, TimeUnit.SECONDS);

assertBusy(() -> {
Expand All @@ -226,9 +182,9 @@ private void waitAndAssertDownsamplingCompleted(String dataStreamName) throws Ex
String writeIndex = dsBackingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
// the last downsampling round must remain in the data stream
assertThat(dsBackingIndices.get(0).getName(), is(tenSecondsDownsampleIndex));
assertThat(dsBackingIndices.get(0).getName(), is(secondRoundDownsamplingIndex));
assertThat(indexExists(firstGenerationBackingIndex), is(false));
assertThat(indexExists(oneSecondDownsampleIndex), is(false));
assertThat(indexExists(firstRoundDownsamplingIndex), is(false));
}, 30, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -378,55 +334,6 @@ private void bulkIndex(Client client, String dataStreamName, Supplier<XContentBu
logger.info("-> Indexed [{}] documents. Dropped [{}] duplicates.", docsIndexed, duplicates);
}

public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin {

static final String SYSTEM_DATA_STREAM_NAME = ".fleet-actions-results";

@Override
public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
Settings.Builder settings = indexSettings(1, 0).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
.putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), List.of(FIELD_DIMENSION_1));

try {
return List.of(
new SystemDataStreamDescriptor(
SYSTEM_DATA_STREAM_NAME,
"a system data stream for testing",
SystemDataStreamDescriptor.Type.EXTERNAL,
new ComposableIndexTemplate(
List.of(SYSTEM_DATA_STREAM_NAME),
new Template(settings.build(), getTSDBMappings(), null, null),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate()
),
Map.of(),
Collections.singletonList("test"),
new ExecutorNames(
ThreadPool.Names.SYSTEM_CRITICAL_READ,
ThreadPool.Names.SYSTEM_READ,
ThreadPool.Names.SYSTEM_WRITE
)
)
);
} catch (IOException e) {
throw new RuntimeException("Unable to create system data stream descriptor", e);
}
}

@Override
public String getFeatureName() {
return SystemDataStreamTestPlugin.class.getSimpleName();
}

@Override
public String getFeatureDescription() {
return "A plugin for testing the data stream lifecycle runtime actions on system data streams";
}
}

public static class SystemDataStreamWithDownsamplingConfigurationPlugin extends Plugin implements SystemIndexPlugin {

static final String SYSTEM_DATA_STREAM_NAME = ".fleet-actions-results";
Expand Down Expand Up @@ -484,7 +391,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {

@Override
public String getFeatureName() {
return SystemDataStreamTestPlugin.class.getSimpleName();
return SystemDataStreamWithDownsamplingConfigurationPlugin.class.getSimpleName();
}

@Override
Expand Down

0 comments on commit 2500acf

Please sign in to comment.