Skip to content

Migrate aliased indices to data stream #61525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1780b79
migrate aliased indices to data stream
danhermann Aug 18, 2020
36c4ace
update cluster state a single time for all backing indices
danhermann Aug 26, 2020
d32b799
Merge branch 'master' into 61046_migrate_to_data_stream
danhermann Sep 29, 2020
3bfdfda
add data stream timestamp field mapper
danhermann Sep 29, 2020
24eaac9
prohibit migration of aliases without a write index
danhermann Oct 1, 2020
8062020
Merge branch 'master' into 61046_migrate_to_data_stream
elasticmachine Oct 13, 2020
d1d79ab
fix compilation and test
danhermann Oct 13, 2020
c1196d3
add transport actions
danhermann Oct 13, 2020
d583acd
minor fixes
danhermann Oct 13, 2020
9fad1c3
wip on IT
danhermann Oct 19, 2020
ea39069
basic IT
danhermann Oct 19, 2020
b117559
Merge branch 'master' into 61046_migrate_to_data_stream
elasticmachine Oct 19, 2020
9ad2cbf
spotless
danhermann Oct 19, 2020
847ff2b
wip
danhermann Oct 20, 2020
4d2149f
Merge branch 'master' into 61046_migrate_to_data_stream
elasticmachine Oct 21, 2020
6017302
fix test
danhermann Oct 21, 2020
89348ed
Use MapperService to update the mapping.
martijnvg Oct 22, 2020
3550fad
add min node version check, fix spotless
danhermann Oct 22, 2020
4c80672
add tests
danhermann Oct 22, 2020
9cb64c8
spotless
danhermann Oct 22, 2020
35ea278
Merge branch 'master' into 61046_migrate_to_data_stream
elasticmachine Oct 22, 2020
0bda0b9
Merge branch 'master' into 61046_migrate_to_data_stream
danhermann Oct 26, 2020
9816cc3
fix merge error
danhermann Oct 26, 2020
2d9ba91
moar compile errors
danhermann Oct 26, 2020
497f64b
revise tests
danhermann Oct 27, 2020
5152825
spotless
danhermann Oct 27, 2020
264602b
Merge branch 'master' into 61046_migrate_to_data_stream
elasticmachine Oct 27, 2020
768b163
remove test code
danhermann Oct 27, 2020
bbcec4f
revise log message
danhermann Oct 28, 2020
af7d4b4
validate that TSFM can be enabled but not disabled
danhermann Oct 28, 2020
b0c73d4
checkstyle
danhermann Oct 28, 2020
4edaf1d
fix wording
danhermann Oct 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ public ComposableIndexTemplate(List<String> indexPatterns, @Nullable Template te
this(indexPatterns, template, componentTemplates, priority, version, metadata, null, null);
}

public ComposableIndexTemplate(List<String> indexPatterns, @Nullable Template template, @Nullable List<String> componentTemplates,
@Nullable Long priority, @Nullable Long version, @Nullable Map<String, Object> metadata,
@Nullable DataStreamTemplate dataStreamTemplate) {
this(indexPatterns, template, componentTemplates, priority, version, metadata, dataStreamTemplate, null);
}

public ComposableIndexTemplate(List<String> indexPatterns, @Nullable Template template, @Nullable List<String> componentTemplates,
@Nullable Long priority, @Nullable Long version, @Nullable Map<String, Object> metadata,
@Nullable DataStreamTemplate dataStreamTemplate, @Nullable Boolean allowAutoCreate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public DataStream(String name, TimestampField timeStampField, List<Index> indice
this.generation = generation;
this.metadata = metadata;
assert indices.size() > 0;
assert indices.get(indices.size() - 1).getName().equals(getDefaultBackingIndexName(name, generation));
}

public DataStream(String name, TimestampField timeStampField, List<Index> indices) {
Expand All @@ -80,6 +79,10 @@ public long getGeneration() {
return generation;
}

public Index getWriteIndex() {
return indices.get(indices.size() - 1);
}

@Nullable
public Map<String, Object> getMetadata() {
return metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Objects;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING;

/**
Expand Down Expand Up @@ -330,7 +329,6 @@ public DataStream(org.elasticsearch.cluster.metadata.DataStream dataStream, List
this.dataStream = dataStream;
this.dataStreamIndices = List.copyOf(dataStreamIndices);
this.writeIndex = dataStreamIndices.get(dataStreamIndices.size() - 1);
assert writeIndex.getIndex().getName().equals(getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -46,7 +48,9 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

public class MetadataCreateDataStreamService {

Expand Down Expand Up @@ -120,49 +124,77 @@ public CreateDataStreamClusterStateUpdateRequest(String name,

static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
ClusterState currentState,
CreateDataStreamClusterStateUpdateRequest request)
throws Exception {
if (currentState.metadata().dataStreams().containsKey(request.name)) {
throw new ResourceAlreadyExistsException("data_stream [" + request.name + "] already exists");
CreateDataStreamClusterStateUpdateRequest request) throws Exception {
return createDataStream(metadataCreateIndexService, currentState, request.name, List.of(), null);
}

/**
* Creates a data stream with the specified properties.
*
* @param metadataCreateIndexService Used if a new write index must be created
* @param currentState Cluster state
* @param dataStreamName Name of the data stream
* @param backingIndices List of backing indices. May be empty
* @param writeIndex Write index for the data stream. If null, a new write index will be created.
* @return Cluster state containing the new data stream
*/
static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
ClusterState currentState,
String dataStreamName,
List<IndexMetadata> backingIndices,
IndexMetadata writeIndex) throws Exception
{
if (writeIndex == null) {
Objects.requireNonNull(metadataCreateIndexService);
}
Objects.requireNonNull(currentState);
Objects.requireNonNull(backingIndices);
if (currentState.metadata().dataStreams().containsKey(dataStreamName)) {
throw new ResourceAlreadyExistsException("data_stream [" + dataStreamName + "] already exists");
}

MetadataCreateIndexService.validateIndexOrAliasName(request.name,
MetadataCreateIndexService.validateIndexOrAliasName(dataStreamName,
(s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));

if (request.name.toLowerCase(Locale.ROOT).equals(request.name) == false) {
throw new IllegalArgumentException("data_stream [" + request.name + "] must be lowercase");
if (dataStreamName.toLowerCase(Locale.ROOT).equals(dataStreamName) == false) {
throw new IllegalArgumentException("data_stream [" + dataStreamName + "] must be lowercase");
}
if (request.name.startsWith(".")) {
throw new IllegalArgumentException("data_stream [" + request.name + "] must not start with '.'");
if (dataStreamName.startsWith(".")) {
throw new IllegalArgumentException("data_stream [" + dataStreamName + "] must not start with '.'");
}

ComposableIndexTemplate template = lookupTemplateForDataStream(request.name, currentState.metadata());

String firstBackingIndexName = DataStream.getDefaultBackingIndexName(request.name, 1);
CreateIndexClusterStateUpdateRequest createIndexRequest =
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
.dataStreamName(request.name)
.settings(Settings.builder().put("index.hidden", true).build());
try {
currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
} catch (ResourceAlreadyExistsException e) {
// Rethrow as ElasticsearchStatusException, so that bulk transport action doesn't ignore it during
// auto index/data stream creation.
// (otherwise bulk execution fails later, because data stream will also not have been created)
throw new ElasticsearchStatusException("data stream could not be created because backing index [{}] already exists",
RestStatus.BAD_REQUEST, e, firstBackingIndexName);
ComposableIndexTemplate template = lookupTemplateForDataStream(dataStreamName, currentState.metadata());

if (writeIndex == null) {
String firstBackingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
CreateIndexClusterStateUpdateRequest createIndexRequest =
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
.dataStreamName(dataStreamName)
.settings(Settings.builder().put("index.hidden", true).build());
try {
currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
} catch (ResourceAlreadyExistsException e) {
// Rethrow as ElasticsearchStatusException, so that bulk transport action doesn't ignore it during
// auto index/data stream creation.
// (otherwise bulk execution fails later, because data stream will also not have been created)
throw new ElasticsearchStatusException("data stream could not be created because backing index [{}] already exists",
RestStatus.BAD_REQUEST, e, firstBackingIndexName);
}
writeIndex = currentState.metadata().index(firstBackingIndexName);
}
IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName);
assert firstBackingIndex != null;
assert firstBackingIndex.mapping() != null : "no mapping found for backing index [" + firstBackingIndexName + "]";
assert writeIndex != null;
assert writeIndex.mapping() != null : "no mapping found for backing index [" + writeIndex.getIndex().getName() + "]";

String fieldName = template.getDataStreamTemplate().getTimestampField();
DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName);
DataStream newDataStream =
new DataStream(request.name, timestampField, List.of(firstBackingIndex.getIndex()), 1L,
template.metadata() != null ? Map.copyOf(template.metadata()) : null);
List<Index> dsBackingIndices = backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList());
dsBackingIndices.add(writeIndex.getIndex());
DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L,
template.metadata() != null ? Map.copyOf(template.metadata()) : null);
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
logger.info("adding data stream [{}]", request.name);
logger.info("adding data stream [{}] with write index [{}] and backing indices [{}]", dataStreamName,
writeIndex.getIndex().getName(),
Strings.arrayToCommaDelimitedString(backingIndices.stream().map(i -> i.getIndex().getName()).toArray()));
return ClusterState.builder(currentState).metadata(builder).build();
}

Expand Down
Loading