-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use @timestamp field to route documents to a backing index of a data stream #82079
Use @timestamp field to route documents to a backing index of a data stream #82079
Conversation
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser); | ||
ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser); | ||
String timestampAsString = parser.text(); | ||
// TODO: deal with nanos too here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We add something (format string) to MappingMetadata
that indicates how the @timestamp
field should be parsed. We would need to fetch this from the latest backing index of a data stream.
Alternatively we can add information of how the @timestamp
field should be parsed to DataStream
class. Which feels like a better place, since we know this prior to selecting the right backing index based on the @timestamp
field here.
Pinging @elastic/es-analytics-geo (Team:Analytics) |
Pinging @elastic/es-data-management (Team:Data Management) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me in general from the TSDB perspective. Left a couple of suggestions.
|
||
Index result = dataStream.selectWriteIndex(timestamp, metadata); | ||
if (result == null) { | ||
throw new IllegalArgumentException("no index available for a document with an @timestamp of [" + timestampAsString + "]"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great if we could add a bit more useful information here. I think I would rephrase it as "the document timestamp [2022-01-07T19:04:41Z] is outside of ranges of currently writable indices: [[2022-01-06T00:00:00.000Z-2022-01-06T16:02:12.251Z], [2022-01-06T16:02:12.251Z-2022-01-06T20:02:12.251Z]]" or something like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also implemented via: f97d889
@@ -718,6 +733,40 @@ public boolean isRequireAlias() { | |||
return requireAlias; | |||
} | |||
|
|||
@Override | |||
public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like a bit too much logic for a data class, that the IndexRequest is essentially is. I wonder if it makes more sense as a part of IndexAbstraction instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented via: f97d889
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few really minor comments, but otherwise this looks good to me.
A temporal slice of backing indices never overlap within a data stream, so either 1 backing index can be selected or none.
Can you point me to where we do this validation? I wanted to get more familiar with it.
@@ -151,6 +154,10 @@ | |||
*/ | |||
int route(IndexRouting indexRouting); | |||
|
|||
default Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add javadocs for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added via: efdd833
try (XContentParser parser = contentType.xContent().createParser(TS_EXTRACT_CONFIG, source().streamInput())) { | ||
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); | ||
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser); | ||
ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to support epoch millis here also, correct? I tested it locally and it works, but not sure why this doesn't blow up since I would expect it to fail when a document with "@timestamp": 12309123
is indexed
@@ -171,6 +175,32 @@ public Index getWriteIndex() { | |||
return indices.get(indices.size() - 1); | |||
} | |||
|
|||
public Index selectWriteIndex(Instant timestamp, Metadata metadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you rename this to be something TSDB specific, like selectTimeseriesWriteIndex
(since we may end up adding different selection criteria in the future) and add javadocs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed and add jdocs: 36af27e
@@ -272,4 +281,102 @@ public void testRejectsEmptyStringPipeline() { | |||
assertThat(validate, notNullValue()); | |||
assertThat(validate.getMessage(), containsString("pipeline cannot be an empty string")); | |||
} | |||
|
|||
public void testGetConcreteWriteIndex() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test that uses epoch millis for the @timestamp
instead of a string to ensure that it picks the right backing index also?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test for this and also added logic for this. The case of providing timestamp as a number representing mills since epoch failed. I fixes this via: d70aa28
The validation that validates the start and end time settings across backing indices doesn't yet exist. But maybe we can do this differently, now that we are going to make time series typed data streams. So I like to add this validation after we made data streams aware of index modes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM also!
So I like to add this validation after we made data streams aware of index modes.
Makes sense, thanks for the heads up
Currently documents that target data streams are resolved to the target the write index of the data stream being targeted.
This change adjust this logic in the bulk api, to first parse the
@timestamp
field and then based on this timestamp select the right backing index. If the parsed timestamp of a document falls between a backing index's start_time and end_time then this backing index is used as write index.Note that this logic is only enabled for tsdb data streams. A temporal slice of backing indices never overlap within a data stream, so either 1 backing index can be selected or none.
Relates #74660