Skip to content

Commit

Permalink
Merge branch 'upstream/master' into tsdb-required-range-settings
Browse files Browse the repository at this point in the history
* upstream/master:
  [ML] Parent datafeed actions to the datafeed's persistent task (elastic#81143)
  Simplify ConcreteIndices and its usage in TransportBulkAction (elastic#81098)
  Unmute DataStreamsSnapshotsIT#testRestoreDataStreamAliasWithConflictingIndicesAlias() test (elastic#81142)
  TSDB: Do not allow index splits for time series indices (elastic#81125)
  Reduce verbosity-increase timeout to 3m (elastic#81118)
  Mute DataStreamsSnapshotsIT#testRestoreDataStreamAliasWithConflictingIndicesAlias() test
  Fix stopping of old elasticsearch cluster (elastic#81059)
  Fix data stream alias validation. (elastic#81040)
  Add replicated field to get data stream api response. (elastic#80988)
  • Loading branch information
weizijun committed Nov 30, 2021
2 parents cbf1ad0 + fe7c2d5 commit 87c8713
Show file tree
Hide file tree
Showing 20 changed files with 438 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class DataStream {
@Nullable
private final Map<String, Object> metadata;
private final boolean allowCustomRouting;
private final boolean replicated;

public DataStream(
String name,
Expand All @@ -47,7 +48,8 @@ public DataStream(
@Nullable Map<String, Object> metadata,
boolean hidden,
boolean system,
boolean allowCustomRouting
boolean allowCustomRouting,
boolean replicated
) {
this.name = name;
this.timeStampField = timeStampField;
Expand All @@ -60,6 +62,7 @@ public DataStream(
this.hidden = hidden;
this.system = system;
this.allowCustomRouting = allowCustomRouting;
this.replicated = replicated;
}

public String getName() {
Expand Down Expand Up @@ -106,6 +109,10 @@ public boolean allowsCustomRouting() {
return allowCustomRouting;
}

public boolean isReplicated() {
return replicated;
}

public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
public static final ParseField INDICES_FIELD = new ParseField("indices");
Expand All @@ -117,6 +124,7 @@ public boolean allowsCustomRouting() {
public static final ParseField HIDDEN_FIELD = new ParseField("hidden");
public static final ParseField SYSTEM_FIELD = new ParseField("system");
public static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing");
public static final ParseField REPLICATED = new ParseField("replicated");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream", args -> {
Expand All @@ -132,6 +140,7 @@ public boolean allowsCustomRouting() {
boolean hidden = args[8] != null && (boolean) args[8];
boolean system = args[9] != null && (boolean) args[9];
boolean allowCustomRouting = args[10] != null && (boolean) args[10];
boolean replicated = args[11] != null && (boolean) args[11];
return new DataStream(
dataStreamName,
timeStampField,
Expand All @@ -143,7 +152,8 @@ public boolean allowsCustomRouting() {
metadata,
hidden,
system,
allowCustomRouting
allowCustomRouting,
replicated
);
});

Expand All @@ -159,6 +169,7 @@ public boolean allowsCustomRouting() {
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SYSTEM_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ALLOW_CUSTOM_ROUTING);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED);
}

public static DataStream fromXContent(XContentParser parser) throws IOException {
Expand All @@ -180,7 +191,8 @@ public boolean equals(Object o) {
&& Objects.equals(indexTemplate, that.indexTemplate)
&& Objects.equals(ilmPolicyName, that.ilmPolicyName)
&& Objects.equals(metadata, that.metadata)
&& allowCustomRouting == that.allowCustomRouting;
&& allowCustomRouting == that.allowCustomRouting
&& replicated == that.replicated;
}

@Override
Expand All @@ -196,7 +208,8 @@ public int hashCode() {
metadata,
hidden,
system,
allowCustomRouting
allowCustomRouting,
replicated
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,8 @@ stream's oldest backing index.
"template": "my-data-stream-template",
"hidden": false,
"system": false,
"allow_custom_routing": false
"allow_custom_routing": false,
"replicated": false
}
]
}
Expand Down
15 changes: 13 additions & 2 deletions docs/reference/indices/get-data-stream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ use the <<indices-get-settings,get index settings API>>.
(Boolean)
If `true`, the data stream is created and managed by an Elastic stack component
and cannot be modified through normal user interaction.
`allow_custom_routing`::
(Boolean)
If `true`, the data stream this data stream allows custom routing on write request.
`replicated`::
(Boolean)
If `true`, the data stream is created and managed by {ccr} and the local
cluster can not write into this data stream or change its mappings.
====

[[get-data-stream-api-example]]
Expand Down Expand Up @@ -247,7 +256,8 @@ The API returns the following response:
"ilm_policy": "my-lifecycle-policy",
"hidden": false,
"system": false,
"allow_custom_routing": false
"allow_custom_routing": false,
"replicated": false
},
{
"name": "my-data-stream-two",
Expand All @@ -269,7 +279,8 @@ The API returns the following response:
"ilm_policy": "my-lifecycle-policy",
"hidden": false,
"system": false,
"allow_custom_routing": false
"allow_custom_routing": false,
"replicated": false
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ the request to have failed. Defaults to `3s`.
(<<static-cluster-setting,Static>>)
Sets how long a node will attempt to discover its peers before it starts to log
verbose messages describing why the connection attempts are failing. Defaults
to `5m`.
to `3m`.

`discovery.seed_resolver.max_concurrent_resolvers`::
(<<static-cluster-setting,Static>>)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
setup:
- skip:
version: " - 7.99.99"
reason: introduced in 8.0.0
version: " - 8.00.99"
reason: introduced in 8.1.0
features: "arbitrary_key"

# Force allocating all shards to a single node so that we can shrink later.
Expand Down Expand Up @@ -85,11 +85,11 @@ setup:
---
split:
- skip:
version: all
reason: shard splitting doesn't work yet
features: "arbitrary_key"
version: " - 8.00.99"
reason: index-split check introduced in 8.1.0

- do:
catch: /index-split is not supported because the destination index \[test\] is in time series mode/
indices.split:
index: test
target: test_split
Expand All @@ -98,19 +98,6 @@ split:
index.number_of_replicas: 0
index.number_of_shards: 6

- do:
search:
index: test_split
body:
fields:
- field: _tsid
query:
query_string:
query: '+@timestamp:"2021-04-28T18:51:04.467Z" +k8s.pod.name:cat'

- match: {hits.total.value: 1}
- match: {hits.hits.0.fields._tsid: [{k8s.pod.uid: 947e4ced-1786-4e53-9e0c-5c447e959507, metricset: pod}]}

---
shrink:
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -124,6 +125,11 @@ protected void masterOperation(
return;
}

// Index splits are not allowed for time-series indices
if (resizeRequest.getResizeType() == ResizeType.SPLIT) {
IndexRouting.fromIndexMetadata(sourceMetadata).checkIndexSplitAllowed();
}

IndicesStatsRequestBuilder statsRequestBuilder = client.admin()
.indices()
.prepareStats(sourceIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ protected void doRun() {
if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) {
continue;
}
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metadata)) {
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices)) {
continue;
}
Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
Expand Down Expand Up @@ -591,7 +591,7 @@ public void onResponse(BulkShardResponse bulkShardResponse) {
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
final String indexName = request.index();
DocWriteRequest<?> docWriteRequest = request.request();
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e);
responses.set(request.id(), BulkItemResponse.failure(request.id(), docWriteRequest.opType(), failure));
Expand Down Expand Up @@ -678,29 +678,18 @@ private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest<?> re
return false;
}

private boolean addFailureIfIndexIsUnavailable(
DocWriteRequest<?> request,
int idx,
final ConcreteIndices concreteIndices,
final Metadata metadata
) {
private boolean addFailureIfIndexIsUnavailable(DocWriteRequest<?> request, int idx, final ConcreteIndices concreteIndices) {
IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index());
if (cannotCreate != null) {
addFailure(request, idx, cannotCreate);
return true;
}
Index concreteIndex = concreteIndices.getConcreteIndex(request.index());
if (concreteIndex == null) {
try {
concreteIndex = concreteIndices.resolveIfAbsent(request);
} catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) {
addFailure(request, idx, ex);
return true;
}
}
IndexMetadata indexMetadata = metadata.getIndexSafe(concreteIndex);
if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
addFailure(request, idx, new IndexClosedException(concreteIndex));
try {
assert request.indicesOptions().forbidClosedIndices() : "only open indices can be resolved";
Index concreteIndex = concreteIndices.resolveIfAbsent(request);
assert concreteIndex != null;
} catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) {
addFailure(request, idx, ex);
return true;
}
return false;
Expand Down Expand Up @@ -738,10 +727,6 @@ private static class ConcreteIndices {
this.indexNameExpressionResolver = indexNameExpressionResolver;
}

Index getConcreteIndex(String indexOrAlias) {
return indices.get(indexOrAlias);
}

Index resolveIfAbsent(DocWriteRequest<?> request) {
Index concreteIndex = indices.get(request.index());
if (concreteIndex == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1587,17 +1587,23 @@ public Metadata build(boolean builtIndicesLookupEagerly) {
indexMetadata.getAliases().keysIt().forEachRemaining(allAliases::add);
}

final ArrayList<String> duplicates = new ArrayList<>();
final Set<String> allDataStreams = new HashSet<>();
DataStreamMetadata dataStreamMetadata = (DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE);
if (dataStreamMetadata != null) {
for (DataStream dataStream : dataStreamMetadata.dataStreams().values()) {
allDataStreams.add(dataStream.getName());
}
// Adding data stream aliases:
for (String dataStreamAlias : dataStreamMetadata.getDataStreamAliases().keySet()) {
if (allAliases.add(dataStreamAlias) == false) {
duplicates.add("data stream alias and indices alias have the same name (" + dataStreamAlias + ")");
}
}
}

final Set<String> aliasDuplicatesWithIndices = new HashSet<>(allAliases);
aliasDuplicatesWithIndices.retainAll(allIndices);
ArrayList<String> duplicates = new ArrayList<>();
if (aliasDuplicatesWithIndices.isEmpty() == false) {
// iterate again and constructs a helpful message
for (ObjectCursor<IndexMetadata> cursor : indices.values()) {
Expand All @@ -1613,12 +1619,19 @@ public Metadata build(boolean builtIndicesLookupEagerly) {
aliasDuplicatesWithDataStreams.retainAll(allDataStreams);
if (aliasDuplicatesWithDataStreams.isEmpty() == false) {
// iterate again and constructs a helpful message
for (ObjectCursor<IndexMetadata> cursor : indices.values()) {
for (String alias : aliasDuplicatesWithDataStreams) {
for (String alias : aliasDuplicatesWithDataStreams) {
// reported var avoids adding a message twice if an index alias has the same name as a data stream.
boolean reported = false;
for (ObjectCursor<IndexMetadata> cursor : indices.values()) {
if (cursor.value.getAliases().containsKey(alias)) {
duplicates.add(alias + " (alias of " + cursor.value.getIndex() + ") conflicts with data stream");
reported = true;
}
}
// This is for adding an error message for when a data steam alias has the same name as a data stream.
if (reported == false && dataStreamMetadata != null && dataStreamMetadata.dataStreams().containsKey(alias)) {
duplicates.add("data stream alias and data stream have the same name (" + alias + ")");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ private static int effectiveRoutingToHash(String effectiveRouting) {
return Murmur3HashFunction.hash(effectiveRouting);
}

/**
* Check if the _split index operation is allowed for an index
* @throws IllegalArgumentException if the operation is not allowed
*/
public void checkIndexSplitAllowed() {}

private abstract static class IdAndRoutingOnly extends IndexRouting {
private final boolean routingRequired;

Expand Down Expand Up @@ -295,6 +301,11 @@ public int getShard(String id, @Nullable String routing) {
throw new IllegalArgumentException(error("get"));
}

@Override
public void checkIndexSplitAllowed() {
throw new IllegalArgumentException(error("index-split"));
}

@Override
public void collectSearchShards(String routing, IntConsumer consumer) {
throw new IllegalArgumentException(error("searching with a specified routing"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public abstract class PeerFinder {
// the master before we start to emit more verbose logs.
public static final Setting<TimeValue> VERBOSITY_INCREASE_TIMEOUT_SETTING = Setting.timeSetting(
"discovery.find_peers_warning_timeout",
TimeValue.timeValueMinutes(5),
TimeValue.timeValueMinutes(3),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
);
Expand Down
Loading

0 comments on commit 87c8713

Please sign in to comment.