Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSDB: add index timestamp range check #78291

Merged
merged 30 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2bad389
add settings
weizijun Sep 24, 2021
8809757
add check code
weizijun Sep 24, 2021
6c42357
add some comment
weizijun Sep 24, 2021
c3f1e66
add yaml tests
weizijun Sep 26, 2021
79e96e0
add the time_series mode check
weizijun Sep 27, 2021
b4a68c3
improve
weizijun Sep 28, 2021
63e552c
Revert "improve"
weizijun Sep 28, 2021
0e67726
Revert "Revert "improve""
weizijun Sep 28, 2021
e3b7256
Merge branch 'upstream/master' into tsdb-deal-timestreamp
weizijun Oct 18, 2021
78c132d
spotless
weizijun Oct 18, 2021
fe8f654
Merge branch 'upstream/master' into tsdb-deal-timestreamp
weizijun Oct 26, 2021
120cdaf
improve
weizijun Oct 27, 2021
563ca87
fix ccr tests
weizijun Oct 27, 2021
daccfb7
Merge branch 'master' into tsdb-deal-timestreamp
weizijun Oct 27, 2021
14a2378
spotless
weizijun Oct 27, 2021
2c45c57
improve
weizijun Oct 28, 2021
d2a00ec
Merge branch 'master' into tsdb-deal-timestreamp
elasticmachine Oct 28, 2021
d631649
Merge branch 'master' into tsdb-deal-timestreamp
elasticmachine Nov 2, 2021
1dcebb0
fix bwc
weizijun Nov 2, 2021
79db882
improve
weizijun Nov 3, 2021
69df14d
Maybe like this?
nik9000 Nov 3, 2021
6c77b18
fix failed tests
weizijun Nov 11, 2021
40f2e82
fix failed tests
weizijun Nov 11, 2021
582588f
fix failed tests
weizijun Nov 11, 2021
bfe25c5
improve
weizijun Nov 11, 2021
6350ee1
checkStyle
weizijun Nov 11, 2021
a5a116f
spotless
weizijun Nov 11, 2021
7236ee1
fixex tsdb settings tests
weizijun Nov 11, 2021
20c21a8
Merge branch 'master' into tsdb-deal-timestreamp
nik9000 Nov 11, 2021
9e92d09
Fixup after merge
nik9000 Nov 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,65 @@ routing required:
mappings:
_routing:
required: true
---
set start_time and end_time:
- skip:
version: " - 8.0.99"
reason: introduced in 8.1.0
- do:
indices.create:
index: test_index
body:
settings:
index:
mode: time_series
routing_path: [metricset]
time_series:
start_time: 1632625782000
end_time: 1632625792000

- do:
indices.put_settings:
index: test_index
body:
index:
time_series:
end_time: 1632625793000

- do:
catch: /index.time_series.end_time must be larger than current value \[1632625793000\]/
indices.put_settings:
index: test_index
body:
index:
time_series:
end_time: 1632625792000

- do:
indices.delete:
index: test_index

---
set start_time and end_time without timeseries mode:
- skip:
version: " - 8.0.99"
reason: introduced in 8.1.0
- do:
catch: /\[index.time_series.start_time\] requires \[index.mode=time_series\]/
indices.create:
index: test_index
body:
settings:
index:
time_series:
start_time: 1632625782000

- do:
catch: /\[index.time_series.end_time\] requires \[index.mode=time_series\]/
indices.create:
index: test_index
body:
settings:
index:
time_series:
end_time: 1632625782000
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ private static Set<Setting<?>> builtInIndexSettings() {
Set<Setting<?>> result = new HashSet<>(ALWAYS_ENABLED_BUILT_IN_INDEX_SETTINGS);
result.add(IndexSettings.MODE);
result.add(IndexMetadata.INDEX_ROUTING_PATH);
result.add(IndexSettings.TIME_SERIES_START_TIME);
result.add(IndexSettings.TIME_SERIES_END_TIME);
return Set.copyOf(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContentObject;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -1331,6 +1333,16 @@ public static Setting<Long> longSetting(String key, long defaultValue, long minV
);
}

public static Setting<Instant> dateSetting(String key, Instant defaultValue, Validator<Instant> validator, Property... properties) {
return new Setting<>(
key,
defaultValue.toString(),
(s) -> Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(s)),
validator,
properties
);
}

public static Setting<String> simpleString(String key, Property... properties) {
return new Setting<>(key, s -> "", Function.identity(), properties);
}
Expand Down
37 changes: 25 additions & 12 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,23 @@

/**
* "Mode" that controls which behaviors and settings an index supports.
* <p>
* For the most part this class concentrates on validating settings and
* mappings. Most different behavior is controlled by forcing settings
* to be set or not set and by enabling extra fields in the mapping.
*/
public enum IndexMode {
STANDARD {
@Override
void validateWithOtherSettings(Map<Setting<?>, Object> settings) {
if (false == Objects.equals(
IndexMetadata.INDEX_ROUTING_PATH.getDefault(Settings.EMPTY),
settings.get(IndexMetadata.INDEX_ROUTING_PATH)
)) {
throw new IllegalArgumentException(
"[" + IndexMetadata.INDEX_ROUTING_PATH.getKey() + "] requires [" + IndexSettings.MODE.getKey() + "=time_series]"
);
settingRequiresTimeSeries(settings, IndexMetadata.INDEX_ROUTING_PATH);
settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_START_TIME);
settingRequiresTimeSeries(settings, IndexSettings.TIME_SERIES_END_TIME);
}

private void settingRequiresTimeSeries(Map<Setting<?>, Object> settings, Setting<?> setting) {
if (false == Objects.equals(setting.getDefault(Settings.EMPTY), settings.get(setting))) {
throw new IllegalArgumentException("[" + setting.getKey() + "] requires [" + IndexSettings.MODE.getKey() + "=time_series]");
}
}

Expand All @@ -67,10 +72,13 @@ void validateWithOtherSettings(Map<Setting<?>, Object> settings) {
throw new IllegalArgumentException(error(unsupported));
}
}
if (IndexMetadata.INDEX_ROUTING_PATH.getDefault(Settings.EMPTY).equals(settings.get(IndexMetadata.INDEX_ROUTING_PATH))) {
throw new IllegalArgumentException(
"[" + IndexSettings.MODE.getKey() + "=time_series] requires [" + IndexMetadata.INDEX_ROUTING_PATH.getKey() + "]"
);
settingRequiresTimeSeries(settings, IndexMetadata.INDEX_ROUTING_PATH);
// TODO make start and stop time required
}

private void settingRequiresTimeSeries(Map<Setting<?>, Object> settings, Setting<?> setting) {
if (Objects.equals(setting.getDefault(Settings.EMPTY), settings.get(setting))) {
throw new IllegalArgumentException("[" + IndexSettings.MODE.getKey() + "=time_series] requires [" + setting.getKey() + "]");
}
}

Expand Down Expand Up @@ -150,7 +158,12 @@ private void validateTimeStampField(Object timestampFieldValue) {

static final List<Setting<?>> VALIDATE_WITH_SETTINGS = List.copyOf(
Stream.concat(
Stream.of(IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING, IndexMetadata.INDEX_ROUTING_PATH),
Stream.of(
IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING,
IndexMetadata.INDEX_ROUTING_PATH,
IndexSettings.TIME_SERIES_START_TIME,
IndexSettings.TIME_SERIES_END_TIME
),
TIME_SERIES_UNSUPPORTED.stream()
).collect(toSet())
);
Expand Down
72 changes: 71 additions & 1 deletion server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Booleans;
Expand All @@ -26,6 +27,7 @@
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.Node;

import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -474,6 +476,46 @@ public static boolean isTimeSeriesModeEnabled() {
return Build.CURRENT.isSnapshot() || (TIME_SERIES_MODE_FEATURE_FLAG_REGISTERED != null && TIME_SERIES_MODE_FEATURE_FLAG_REGISTERED);
}

/**
* in time series mode, the start time of the index, timestamp must larger than start_time
*/
public static final Setting<Instant> TIME_SERIES_START_TIME = Setting.dateSetting(
"index.time_series.start_time",
Instant.ofEpochMilli(0),
v -> {},
Property.IndexScope,
Property.Final
);

/**
* in time series mode, the end time of the index, timestamp must smaller than start_time
*/
public static final Setting<Instant> TIME_SERIES_END_TIME = Setting.dateSetting(
"index.time_series.end_time",
DateUtils.MAX_NANOSECOND_INSTANT,
new Setting.Validator<>() {
@Override
public void validate(Instant value) {}

@Override
public void validate(Instant value, Map<Setting<?>, Object> settings) {
@SuppressWarnings("unchecked")
Instant startTime = (Instant) settings.get(TIME_SERIES_START_TIME);
if (startTime.toEpochMilli() > value.toEpochMilli()) {
throw new IllegalArgumentException("index.time_series.end_time must be larger than index.time_series.start_time");
}
}

@Override
public Iterator<Setting<?>> settings() {
List<Setting<?>> settings = List.of(TIME_SERIES_START_TIME);
return settings.iterator();
}
},
Property.IndexScope,
Property.Dynamic
weizijun marked this conversation as resolved.
Show resolved Hide resolved
);

/**
* The {@link IndexMode "mode"} of the index.
*/
Expand Down Expand Up @@ -509,6 +551,14 @@ public Iterator<Setting<?>> settings() {
* The {@link IndexMode "mode"} of the index.
*/
private final IndexMode mode;
/**
* Start time of the time_series index.
*/
private final long timeSeriesStartTime;
/**
* End time of the time_series index.
*/
private volatile long timeSeriesEndTime;

// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
Expand Down Expand Up @@ -651,7 +701,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
mode = isTimeSeriesModeEnabled() ? scopedSettings.get(MODE) : IndexMode.STANDARD;

timeSeriesStartTime = TIME_SERIES_START_TIME.get(settings).toEpochMilli();
timeSeriesEndTime = TIME_SERIES_END_TIME.get(settings).toEpochMilli();
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -762,6 +813,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DEPTH_LIMIT_SETTING, this::setMappingDepthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DIMENSION_FIELDS_LIMIT_SETTING, this::setMappingDimensionFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(TIME_SERIES_END_TIME, this::updateTimeSeriesEndTime);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -1275,4 +1327,22 @@ public long getMappingDimensionFieldsLimit() {
private void setMappingDimensionFieldsLimit(long value) {
this.mappingDimensionFieldsLimit = value;
}

public long getTimeSeriesStartTime() {
return timeSeriesStartTime;
}

public long getTimeSeriesEndTime() {
return timeSeriesEndTime;
}

public void updateTimeSeriesEndTime(Instant endTimeInstant) {
long endTime = endTimeInstant.toEpochMilli();
if (this.timeSeriesEndTime > endTime) {
throw new IllegalArgumentException(
"index.time_series.end_time must be larger than current value [" + this.timeSeriesEndTime + "]"
);
}
this.timeSeriesEndTime = endTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.search.Query;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
Expand All @@ -23,6 +24,7 @@
import java.util.List;
import java.util.Map;

import static org.elasticsearch.core.TimeValue.NSEC_PER_MSEC;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

/**
Expand Down Expand Up @@ -204,6 +206,29 @@ public void postParse(DocumentParserContext context) throws IOException {
if (numberOfValues > 1) {
throw new IllegalArgumentException("data stream timestamp field [" + DEFAULT_PATH + "] encountered multiple values");
}

validateTimestamp(fields[0], context);
}

private void validateTimestamp(IndexableField field, DocumentParserContext context) {
if (context.indexSettings().getMode() == null || context.indexSettings().getMode() != IndexMode.TIME_SERIES) {
return;
}

long value = field.numericValue().longValue();
if (context.mappingLookup().getMapper(DEFAULT_PATH).typeName().equals(DateFieldMapper.DATE_NANOS_CONTENT_TYPE)) {
value /= NSEC_PER_MSEC;
}

long startTime = context.indexSettings().getTimeSeriesStartTime();
if (value < startTime) {
throw new IllegalArgumentException("time series index @timestamp value [" + value + "] must be larger than " + startTime);
}

long endTime = context.indexSettings().getTimeSeriesEndTime();
if (value >= endTime) {
throw new IllegalArgumentException("time series index @timestamp value [" + value + "] must be smaller than " + endTime);
}
}

@Override
Expand Down
Loading