Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSDB: add index timestamp range check #78291

Merged
merged 30 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2bad389
add settings
weizijun Sep 24, 2021
8809757
add check code
weizijun Sep 24, 2021
6c42357
add some comment
weizijun Sep 24, 2021
c3f1e66
add yaml tests
weizijun Sep 26, 2021
79e96e0
add the time_series mode check
weizijun Sep 27, 2021
b4a68c3
improve
weizijun Sep 28, 2021
63e552c
Revert "improve"
weizijun Sep 28, 2021
0e67726
Revert "Revert "improve""
weizijun Sep 28, 2021
e3b7256
Merge branch 'upstream/master' into tsdb-deal-timestreamp
weizijun Oct 18, 2021
78c132d
spotless
weizijun Oct 18, 2021
fe8f654
Merge branch 'upstream/master' into tsdb-deal-timestreamp
weizijun Oct 26, 2021
120cdaf
improve
weizijun Oct 27, 2021
563ca87
fix ccr tests
weizijun Oct 27, 2021
daccfb7
Merge branch 'master' into tsdb-deal-timestreamp
weizijun Oct 27, 2021
14a2378
spotless
weizijun Oct 27, 2021
2c45c57
improve
weizijun Oct 28, 2021
d2a00ec
Merge branch 'master' into tsdb-deal-timestreamp
elasticmachine Oct 28, 2021
d631649
Merge branch 'master' into tsdb-deal-timestreamp
elasticmachine Nov 2, 2021
1dcebb0
fix bwc
weizijun Nov 2, 2021
79db882
improve
weizijun Nov 3, 2021
69df14d
Maybe like this?
nik9000 Nov 3, 2021
6c77b18
fix failed tests
weizijun Nov 11, 2021
40f2e82
fix failed tests
weizijun Nov 11, 2021
582588f
fix failed tests
weizijun Nov 11, 2021
bfe25c5
improve
weizijun Nov 11, 2021
6350ee1
checkStyle
weizijun Nov 11, 2021
a5a116f
spotless
weizijun Nov 11, 2021
7236ee1
fixex tsdb settings tests
weizijun Nov 11, 2021
20c21a8
Merge branch 'master' into tsdb-deal-timestreamp
nik9000 Nov 11, 2021
9e92d09
Fixup after merge
nik9000 Nov 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,50 @@ no partitioning:
mode: time_series
shards: 5
routing_partition_size: 2

---
set start_time and end_time:
- skip:
version: " - 7.99.99"
reason: introduced in 8.0.0
- do:
indices.create:
index: test_index
body:
settings:
index:
mode: time_series
time_series:
start_time: 1632625782000
end_time: 1632625792000

- do:
indices.put_settings:
index: test_index
body:
index:
time_series:
start_time: 1632625781000
end_time: 1632625793000

- do:
catch: /index.time_series.start_time must be smaller than pre value \[1632625781000\]/
indices.put_settings:
index: test_index
body:
index:
time_series:
start_time: 1632625783000

- do:
catch: /index.time_series.end_time must be larger than pre value \[1632625793000\]/
indices.put_settings:
index: test_index
body:
index:
time_series:
end_time: 1632625792000

- do:
indices.delete:
index: test_index
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ private static Set<Setting<?>> builtInIndexSettings() {
}
Set<Setting<?>> result = new HashSet<>(ALWAYS_ENABLED_BUILT_IN_INDEX_SETTINGS);
result.add(IndexSettings.MODE);
result.add(IndexSettings.TIME_SERIES_START_TIME);
result.add(IndexSettings.TIME_SERIES_END_TIME);
return Set.copyOf(result);
}

Expand Down
52 changes: 52 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@

package org.elasticsearch.index;

import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexableField;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.DocumentParserContext;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.core.TimeValue.NSEC_PER_MSEC;

/**
* "Mode" that controls which behaviors and settings an index supports.
Expand All @@ -26,8 +32,13 @@ public enum IndexMode {
STANDARD {
@Override
void validateWithOtherSettings(Map<Setting<?>, Object> settings) {}

@Override
public void validateWithSource(DocumentParserContext context) {}
},
TIME_SERIES {
public static final String TIMESTAMP_FIELD = "@timestamp";

@Override
void validateWithOtherSettings(Map<Setting<?>, Object> settings) {
if (settings.get(IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING) != Integer.valueOf(1)) {
Expand All @@ -40,6 +51,45 @@ void validateWithOtherSettings(Map<Setting<?>, Object> settings) {
}
}

@Override
public void validateWithSource(DocumentParserContext context) {
validateTimestamp(context);
}

private void validateTimestamp(DocumentParserContext context) {
IndexableField[] fields = context.rootDoc().getFields(TIMESTAMP_FIELD);
weizijun marked this conversation as resolved.
Show resolved Hide resolved
if (fields.length == 0) {
throw new IllegalArgumentException("time series index @timestamp field is missing");
}

long numberOfValues = Arrays.stream(fields)
.filter(indexableField -> indexableField.fieldType().docValuesType() == DocValuesType.SORTED_NUMERIC)
.count();
if (numberOfValues > 1) {
throw new IllegalArgumentException("time series index @timestamp field encountered multiple values");
}

long timestamp = fields[0].numericValue().longValue();
if (context.mappingLookup().getMapper(TIMESTAMP_FIELD).typeName().equals(DateFieldMapper.DATE_NANOS_CONTENT_TYPE)) {
timestamp /= NSEC_PER_MSEC;
}

long startTime = context.indexSettings().getTimeSeriesStartTime();
long endTime = context.indexSettings().getTimeSeriesEndTime();

if (timestamp < startTime) {
throw new IllegalArgumentException(
"time series index @timestamp value [" + timestamp + "] must be larger than " + startTime
);
}

if (timestamp > endTime) {
weizijun marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException(
"time series index @timestamp value [" + timestamp + "] must be smaller than " + endTime
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to do this validation any time the settings are present and just have IndexMode make sure the settings are present.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when IndexMode is time_series, start_time has a default value: Instant.ofEpochMilli(0), end_time also has a default value: DateUtils.MAX_NANOSECOND_INSTANT.
and these settings can only be set in time_series mode

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I meant that it's better to run the validation if the max and min are set rather than I the mode is time series. I know one implies the other, but I'd feel more comfortable if the field mapper didn't have to know about time series mode. It only knows about the setting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You pinged me and asked for another review but haven't change this so I'll restate my objections - I feel like, as much as possible, IndexMode should be about making sure that the appropriate settings are set and the appropriate mapping is in place. Then we can let the mapping do the assertions. So I think it's more right to do the test in DataStreamTimestampFieldMapper and do it something like:

if (context.settings.startTime.isSet) {
  if (time < context.settings.startTime) {
...

I like this because a reader doesn't have to know what time series mode is - they just have to know that they is some way to set the min and max time. If they need to figure out how to set it then they can. But lots of times I find myself not caring where a behavior comes from, just that we have this behavior.

}
}

private String error(Setting<?> unsupported) {
return "[" + IndexSettings.MODE.getKey() + "=time_series] is incompatible with [" + unsupported.getKey() + "]";
}
Expand All @@ -57,4 +107,6 @@ private String error(Setting<?> unsupported) {
);

abstract void validateWithOtherSettings(Map<Setting<?>, Object> settings);

public abstract void validateWithSource(DocumentParserContext context);
}
61 changes: 61 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,28 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

/**
* in time series mode, the start time of the index, timestamp must larger than start_time
*/
public static final Setting<Long> TIME_SERIES_START_TIME = Setting.longSetting(
weizijun marked this conversation as resolved.
Show resolved Hide resolved
"index.time_series.start_time",
-1L,
-1L,
Property.IndexScope,
Property.Dynamic
weizijun marked this conversation as resolved.
Show resolved Hide resolved
);

/**
* in time series mode, the end time of the index, timestamp must smaller than start_time
*/
public static final Setting<Long> TIME_SERIES_END_TIME = Setting.longSetting(
"index.time_series.end_time",
-1L,
-1L,
Property.IndexScope,
Property.Dynamic
);

private final Index index;
private final Version version;
private final Logger logger;
Expand All @@ -373,6 +395,8 @@ public Iterator<Setting<?>> settings() {
* The {@link IndexMode "mode"} of the index.
*/
private final IndexMode mode;
private volatile long timeSeriesStartTime;
private volatile long timeSeriesEndTime;

// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
Expand Down Expand Up @@ -515,6 +539,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
mode = isTimeSeriesModeEnabled() ? scopedSettings.get(MODE) : IndexMode.STANDARD;
timeSeriesStartTime = TIME_SERIES_START_TIME.get(settings);
timeSeriesEndTime = TIME_SERIES_END_TIME.get(settings);

this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
Expand Down Expand Up @@ -614,6 +640,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DEPTH_LIMIT_SETTING, this::setMappingDepthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DIMENSION_FIELDS_LIMIT_SETTING, this::setMappingDimensionFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(TIME_SERIES_START_TIME, this::updateTimeSeriesStartTime);
scopedSettings.addSettingsUpdateConsumer(TIME_SERIES_END_TIME, this::updateTimeSeriesEndTime);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
Expand Down Expand Up @@ -1092,4 +1120,37 @@ public long getMappingDimensionFieldsLimit() {
private void setMappingDimensionFieldsLimit(long value) {
this.mappingDimensionFieldsLimit = value;
}

public long getTimeSeriesStartTime() {
return timeSeriesStartTime;
}

public void updateTimeSeriesStartTime(long startTime) {
if (timeSeriesStartTime < 0) {
throw new IllegalArgumentException("index.time_series.start_time not set before, can not update value");
}

if (this.timeSeriesStartTime < startTime) {
throw new IllegalArgumentException(
"index.time_series.start_time must be smaller than pre value [" + this.timeSeriesStartTime + "]"
);
}
this.timeSeriesStartTime = startTime;
}

public long getTimeSeriesEndTime() {
return timeSeriesEndTime;
}

public void updateTimeSeriesEndTime(long endTime) {
if (timeSeriesEndTime < 0) {
throw new IllegalArgumentException("index.time_series.end_time not set before, can not update value");
}

if (this.timeSeriesEndTime > endTime) {
throw new IllegalArgumentException("index.time_series.end_time must be larger than pre value [" + this.timeSeriesEndTime + "]");
weizijun marked this conversation as resolved.
Show resolved Hide resolved
}

this.timeSeriesEndTime = endTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ private static void internalParseDocument(RootObjectMapper root, MetadataFieldMa

executeIndexTimeScripts(context);

context.indexSettings().getMode().validateWithSource(context);

for (MetadataFieldMapper metadataMapper : metadataFieldsMappers) {
metadataMapper.postParse(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.hamcrest.Matchers;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -545,4 +547,66 @@ public void testCustomDataPathDeprecated() {
assertThat(indexSettings.hasCustomDataPath(), is(true));
assertSettingDeprecationsAndWarnings(new Setting<?>[] { IndexMetadata.INDEX_DATA_PATH_SETTING });
}

public void testUpdateTimeSeriesTimeRange() {
long startTime = Math.min(randomMillisUpToYear9999(), DateUtils.MAX_MILLIS_BEFORE_9999 - 86400000);
weizijun marked this conversation as resolved.
Show resolved Hide resolved
long endTime = startTime + randomLongBetween(1000, 86400000);
final Settings settings = Settings.builder()
.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), startTime)
.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), endTime)
.build();
IndexMetadata metadata = newIndexMeta("test", settings);
IndexSettings indexSettings = new IndexSettings(metadata, Settings.EMPTY);

// test update start_time
{
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> indexSettings.updateTimeSeriesStartTime(startTime + randomLongBetween(10, 1000))
);
assertThat(e.getMessage(), Matchers.containsString("index.time_series.start_time must be smaller than pre value"));

// success
long newStartTime = Math.max(startTime - randomLongBetween(10, 1000), 0);
indexSettings.updateTimeSeriesStartTime(newStartTime);
assertEquals(newStartTime, indexSettings.getTimeSeriesStartTime());
}

// test update end_time
{
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> indexSettings.updateTimeSeriesEndTime(endTime - randomLongBetween(10, 1000))
);
assertThat(e.getMessage(), Matchers.containsString("index.time_series.end_time must be larger than pre value"));

// success
long newEndTime = endTime + randomLongBetween(10, 1000);
indexSettings.updateTimeSeriesEndTime(newEndTime);
assertEquals(newEndTime, indexSettings.getTimeSeriesEndTime());
}
}

public void testUpdateTimeSeriesTimeRangeNotSet() {
final Settings settings = Settings.builder().build();
IndexMetadata metadata = newIndexMeta("test", settings);
IndexSettings indexSettings = new IndexSettings(metadata, Settings.EMPTY);

{
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> indexSettings.updateTimeSeriesStartTime(randomLongBetween(10, 1000))
);
assertThat(e.getMessage(), Matchers.containsString("index.time_series.start_time not set before, can not update value"));
}

{
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> indexSettings.updateTimeSeriesEndTime(randomLongBetween(10, 1000))
);

assertThat(e.getMessage(), Matchers.containsString("index.time_series.end_time not set before, can not update value"));
}
}
}
Loading