Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSDB: add index timestamp range check #78291

Merged
merged 30 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2bad389
add settings
weizijun Sep 24, 2021
8809757
add check code
weizijun Sep 24, 2021
6c42357
add some comment
weizijun Sep 24, 2021
c3f1e66
add yaml tests
weizijun Sep 26, 2021
79e96e0
add the time_series mode check
weizijun Sep 27, 2021
b4a68c3
improve
weizijun Sep 28, 2021
63e552c
Revert "improve"
weizijun Sep 28, 2021
0e67726
Revert "Revert "improve""
weizijun Sep 28, 2021
e3b7256
Merge branch 'upstream/master' into tsdb-deal-timestreamp
weizijun Oct 18, 2021
78c132d
spotless
weizijun Oct 18, 2021
fe8f654
Merge branch 'upstream/master' into tsdb-deal-timestreamp
weizijun Oct 26, 2021
120cdaf
improve
weizijun Oct 27, 2021
563ca87
fix ccr tests
weizijun Oct 27, 2021
daccfb7
Merge branch 'master' into tsdb-deal-timestreamp
weizijun Oct 27, 2021
14a2378
spotless
weizijun Oct 27, 2021
2c45c57
improve
weizijun Oct 28, 2021
d2a00ec
Merge branch 'master' into tsdb-deal-timestreamp
elasticmachine Oct 28, 2021
d631649
Merge branch 'master' into tsdb-deal-timestreamp
elasticmachine Nov 2, 2021
1dcebb0
fix bwc
weizijun Nov 2, 2021
79db882
improve
weizijun Nov 3, 2021
69df14d
Maybe like this?
nik9000 Nov 3, 2021
6c77b18
fix failed tests
weizijun Nov 11, 2021
40f2e82
fix failed tests
weizijun Nov 11, 2021
582588f
fix failed tests
weizijun Nov 11, 2021
bfe25c5
improve
weizijun Nov 11, 2021
6350ee1
checkStyle
weizijun Nov 11, 2021
a5a116f
spotless
weizijun Nov 11, 2021
7236ee1
fixex tsdb settings tests
weizijun Nov 11, 2021
20c21a8
Merge branch 'master' into tsdb-deal-timestreamp
nik9000 Nov 11, 2021
9e92d09
Fixup after merge
nik9000 Nov 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,65 @@ routing required:
mappings:
_routing:
required: true
---
set start_time and end_time:
- skip:
version: " - 8.0.99"
reason: introduced in 8.1.0
- do:
indices.create:
index: test_index
body:
settings:
index:
mode: time_series
routing_path: [metricset]
time_series:
start_time: 1632625782000
end_time: 1632625792000

- do:
indices.put_settings:
index: test_index
body:
index:
time_series:
end_time: 1632625793000

- do:
catch: /index.time_series.end_time must be larger than current value \[1632625793000\]/
indices.put_settings:
index: test_index
body:
index:
time_series:
end_time: 1632625792000

- do:
indices.delete:
index: test_index

---
set start_time and end_time without timeseries mode:
- skip:
version: " - 8.0.99"
reason: introduced in 8.1.0
- do:
catch: /index.time_series.start_time need to be used for time_series mode/
indices.create:
index: test_index
body:
settings:
index:
time_series:
start_time: 1632625782000

- do:
catch: /index.time_series.end_time need to be used for time_series mode/
indices.create:
index: test_index
body:
settings:
index:
time_series:
end_time: 1632625782000
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ private static Set<Setting<?>> builtInIndexSettings() {
Set<Setting<?>> result = new HashSet<>(ALWAYS_ENABLED_BUILT_IN_INDEX_SETTINGS);
result.add(IndexSettings.MODE);
result.add(IndexMetadata.INDEX_ROUTING_PATH);
result.add(IndexSettings.TIME_SERIES_START_TIME);
result.add(IndexSettings.TIME_SERIES_END_TIME);
return Set.copyOf(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContentObject;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -1331,6 +1333,16 @@ public static Setting<Long> longSetting(String key, long defaultValue, long minV
);
}

public static Setting<Instant> dateSetting(String key, Instant defaultValue, Validator<Instant> validator, Property... properties) {
return new Setting<>(
key,
defaultValue.toString(),
(s) -> Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(s)),
validator,
properties
);
}

public static Setting<String> simpleString(String key, Property... properties) {
return new Setting<>(key, s -> "", Function.identity(), properties);
}
Expand Down
36 changes: 24 additions & 12 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,23 @@

/**
* "Mode" that controls which behaviors and settings an index supports.
* <p>
* For the most part this class concentrates on validating settings and
* mappings. Most different behavior is controlled by forcing settings
* to be set or not set and by enabling extra fields in the mapping.
*/
public enum IndexMode {
STANDARD {
@Override
void validateWithOtherSettings(Map<Setting<?>, Object> settings) {
if (false == Objects.equals(
IndexMetadata.INDEX_ROUTING_PATH.getDefault(Settings.EMPTY),
settings.get(IndexMetadata.INDEX_ROUTING_PATH)
)) {
throw new IllegalArgumentException(
"[" + IndexMetadata.INDEX_ROUTING_PATH.getKey() + "] requires [" + IndexSettings.MODE.getKey() + "=time_series]"
);
settingRequiresTimeSeries(settings, IndexMetadata.INDEX_ROUTING_PATH);
settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_START_TIME);
settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_END_TIME);
}

private void settingRequiresTimeSeries(Map<Setting<?>, Object> settings, Setting<?> setting) {
if (false == Objects.equals(setting.getDefault(Settings.EMPTY), settings.get(setting))) {
throw new IllegalArgumentException("[" + setting.getKey() + "] requires [" + IndexSettings.MODE.getKey() + "=time_series]");
}
}

Expand All @@ -67,10 +72,12 @@ void validateWithOtherSettings(Map<Setting<?>, Object> settings) {
throw new IllegalArgumentException(error(unsupported));
}
}
if (IndexMetadata.INDEX_ROUTING_PATH.getDefault(Settings.EMPTY).equals(settings.get(IndexMetadata.INDEX_ROUTING_PATH))) {
throw new IllegalArgumentException(
"[" + IndexSettings.MODE.getKey() + "=time_series] requires [" + IndexMetadata.INDEX_ROUTING_PATH.getKey() + "]"
);
settingRequiresTimeSeries(settings, IndexMetadata.INDEX_ROUTING_PATH);
}

private void settingRequiresTimeSeries(Map<Setting<?>, Object> settings, Setting<?> setting) {
if (Objects.equals(setting.getDefault(Settings.EMPTY), settings.get(setting))) {
throw new IllegalArgumentException("[" + IndexSettings.MODE.getKey() + "=time_series] requires [" + setting.getKey() + "]");
}
}

Expand Down Expand Up @@ -150,7 +157,12 @@ private void validateTimeStampField(Object timestampFieldValue) {

static final List<Setting<?>> VALIDATE_WITH_SETTINGS = List.copyOf(
Stream.concat(
Stream.of(IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING, IndexMetadata.INDEX_ROUTING_PATH),
Stream.of(
IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING,
IndexMetadata.INDEX_ROUTING_PATH,
IndexSettings.TIME_SERIES_START_TIME,
IndexSettings.TIME_SERIES_END_TIME
),
TIME_SERIES_UNSUPPORTED.stream()
).collect(toSet())
);
Expand Down
71 changes: 70 additions & 1 deletion server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Booleans;
Expand All @@ -26,6 +27,7 @@
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.Node;

import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -474,6 +476,46 @@ public static boolean isTimeSeriesModeEnabled() {
return Build.CURRENT.isSnapshot() || (TIME_SERIES_MODE_FEATURE_FLAG_REGISTERED != null && TIME_SERIES_MODE_FEATURE_FLAG_REGISTERED);
}

/**
* in time series mode, the start time of the index, timestamp must larger than start_time
*/
public static final Setting<Instant> TIME_SERIES_START_TIME = Setting.dateSetting(
"index.time_series.start_time",
Instant.ofEpochMilli(0),
v -> {},
Property.IndexScope,
Property.Final
);

/**
* in time series mode, the end time of the index, timestamp must smaller than start_time
*/
public static final Setting<Instant> TIME_SERIES_END_TIME = Setting.dateSetting(
"index.time_series.end_time",
DateUtils.MAX_NANOSECOND_INSTANT,
new Setting.Validator<>() {
@Override
public void validate(Instant value) {}

@Override
public void validate(Instant value, Map<Setting<?>, Object> settings) {
@SuppressWarnings("unchecked")
Instant startTime = (Instant) settings.get(TIME_SERIES_START_TIME);
if (startTime.toEpochMilli() > value.toEpochMilli()) {
throw new IllegalArgumentException("index.time_series.end_time must be larger than index.time_series.start_time");
}
}

@Override
public Iterator<Setting<?>> settings() {
List<Setting<?>> settings = List.of(TIME_SERIES_START_TIME);
return settings.iterator();
}
},
Property.IndexScope,
Property.Dynamic
weizijun marked this conversation as resolved.
Show resolved Hide resolved
);

/**
* The {@link IndexMode "mode"} of the index.
*/
Expand Down Expand Up @@ -509,6 +551,14 @@ public Iterator<Setting<?>> settings() {
* The {@link IndexMode "mode"} of the index.
*/
private final IndexMode mode;
/**
* Start time of the time_series index.
*/
private final Instant timeSeriesStartTime;
/**
* End time of the time_series index.
*/
private volatile Instant timeSeriesEndTime;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that right now these can't be null so all indices are going to validate the range. I wonder if we can convert these to null for the defaults so we disable the validation when it isn't set.


// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
Expand Down Expand Up @@ -651,7 +701,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
mode = isTimeSeriesModeEnabled() ? scopedSettings.get(MODE) : IndexMode.STANDARD;

timeSeriesStartTime = TIME_SERIES_START_TIME.get(settings);
timeSeriesEndTime = TIME_SERIES_END_TIME.get(settings);
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -765,6 +816,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DEPTH_LIMIT_SETTING, this::setMappingDepthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DIMENSION_FIELDS_LIMIT_SETTING, this::setMappingDimensionFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(TIME_SERIES_END_TIME, this::updateTimeSeriesEndTime);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -1278,4 +1330,21 @@ public long getMappingDimensionFieldsLimit() {
private void setMappingDimensionFieldsLimit(long value) {
this.mappingDimensionFieldsLimit = value;
}

public Instant getTimeSeriesStartTime() {
return timeSeriesStartTime;
}

public Instant getTimeSeriesEndTime() {
return timeSeriesEndTime;
}

public void updateTimeSeriesEndTime(Instant endTime) {
if (this.timeSeriesEndTime.isAfter(endTime)) {
throw new IllegalArgumentException(
"index.time_series.end_time must be larger than current value [" + this.timeSeriesEndTime + "]"
);
}
this.timeSeriesEndTime = endTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,22 @@ NumericType numericType() {
*/
public abstract long roundUpToMillis(long value);

public final void validateTimestamp(long value, DocumentParserContext context) {
if (context.indexSettings().getTimeSeriesStartTime() == null) {
assert context.indexSettings().getTimeSeriesEndTime() == null;
return;
}
long startTime = convert(context.indexSettings().getTimeSeriesStartTime());
if (value < startTime) {
throw new IllegalArgumentException("time series index @timestamp value [" + value + "] must be larger than " + startTime);
}

long endTime = convert(context.indexSettings().getTimeSeriesEndTime());
if (value >= endTime) {
throw new IllegalArgumentException("time series index @timestamp value [" + value + "] must be smaller than " + endTime);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I wrote this, but on further reading, I think maybe it makes sense to just move it all to DataStreamTimestampFieldMapper. I thought I was being clever, but all of this code should be able to run just fine from there and it'd be nice that DateFieldMapper doesn't need to care about the validation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with moving this logic to DataStreamTimestampFieldMapper, since there there is already validation for the @timestamp field (whether it exists and whether a single value has been specified). And I think at least initially, tsdb will work only with data streams.


public static Resolution ofOrdinal(int ord) {
for (Resolution resolution : values()) {
if (ord == resolution.ordinal()) {
Expand Down Expand Up @@ -244,6 +260,7 @@ public static class Builder extends FieldMapper.Builder {
private final Resolution resolution;
private final Version indexCreatedVersion;
private final ScriptCompiler scriptCompiler;
private final Validate validate;

public Builder(
String name,
Expand All @@ -270,6 +287,7 @@ public Builder(
this.format.setValue(dateFormatter.pattern());
this.locale.setValue(dateFormatter.locale());
}
this.validate = name.equals(DataStreamTimestampFieldMapper.DEFAULT_PATH) ? resolution::validateTimestamp : (v, c) -> {};
}

private DateFormatter buildFormatter() {
Expand Down Expand Up @@ -336,7 +354,16 @@ public DateFieldMapper build(MapperBuilderContext context) {
);

Long nullTimestamp = parseNullValue(ft);
return new DateFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo.build(), nullTimestamp, resolution, this);
return new DateFieldMapper(
name,
ft,
multiFieldsBuilder.build(this, context),
copyTo.build(),
nullTimestamp,
resolution,
this,
validate
);
}
}

Expand Down Expand Up @@ -716,6 +743,7 @@ public DocValueFormat docValueFormat(@Nullable String format, ZoneId timeZone) {
private final Script script;
private final ScriptCompiler scriptCompiler;
private final FieldValues<Long> scriptValues;
private final Validate validate;

private DateFieldMapper(
String simpleName,
Expand All @@ -724,7 +752,8 @@ private DateFieldMapper(
CopyTo copyTo,
Long nullValue,
Resolution resolution,
Builder builder
Builder builder,
Validate validate
) {
super(simpleName, mappedFieldType, multiFields, copyTo, builder.script.get() != null, builder.onScriptError.get());
this.store = builder.store.getValue();
Expand All @@ -741,6 +770,7 @@ private DateFieldMapper(
this.script = builder.script.get();
this.scriptCompiler = builder.scriptCompiler;
this.scriptValues = builder.scriptValues();
this.validate = validate;
}

@Override
Expand Down Expand Up @@ -780,6 +810,7 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
}
}
}
validate.validate(timestamp, context);

indexValue(context, timestamp);
}
Expand Down Expand Up @@ -815,4 +846,8 @@ public boolean getIgnoreMalformed() {
public Long getNullValue() {
return nullValue;
}

private interface Validate {
void validate(long value, DocumentParserContext context);
}
}
Loading