Skip to content

Commit

Permalink
Add logic so that timestamp can be parsed as millis since epoch provi…
Browse files Browse the repository at this point in the history
…ded as number.
  • Loading branch information
martijnvg committed Jan 10, 2022
1 parent 53dae1d commit d70aa28
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -751,14 +752,33 @@ public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {
try (XContentParser parser = contentType.xContent().createParser(TS_EXTRACT_CONFIG, source().streamInput())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser);
String timestampAsString = parser.text();
// TODO: deal with nanos too here.
// (the index hasn't been resolved yet, keep track of timestamp field metadata at data stream level, so we can use it here)
Instant timestamp = Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(timestampAsString));
Instant timestamp;
switch (parser.nextToken()) {
case VALUE_STRING:
// TODO: deal with nanos too here.
// (the index hasn't been resolved yet, keep track of timestamp field metadata at data stream level, so we can use it
// here)
timestamp = Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(parser.text()));
break;
case VALUE_NUMBER:
timestamp = Instant.ofEpochMilli(parser.longValue());
break;
default:
throw new ParsingException(
parser.getTokenLocation(),
String.format(
Locale.ROOT,
"Failed to parse object: expecting token of type [%s] or [%s] but found [%s]",
XContentParser.Token.VALUE_STRING,
XContentParser.Token.VALUE_NUMBER,
parser.currentToken()
)
);
}

Index result = dataStream.selectWriteIndex(timestamp, metadata);
if (result == null) {
String timestampAsString = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(timestamp);
throw new IllegalArgumentException("no index available for a document with an @timestamp of [" + timestampAsString + "]");
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;

Expand Down Expand Up @@ -298,12 +299,12 @@ public void testGetConcreteWriteIndex() {

String source = """
{
"@timestamp": "$time"
"@timestamp": $time
}""";
{
// Not a create request => resolve to the latest backing index
IndexRequest request = new IndexRequest(tsdbDataStream);
request.source(source.replace("$time", formatInstant(start1)), XContentType.JSON);
request.source(renderSource(source, start1), XContentType.JSON);

var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata);
assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(1)));
Expand All @@ -312,7 +313,7 @@ public void testGetConcreteWriteIndex() {
// Target is a regular index => resolve to this index only
String indexName = metadata.getIndices().keySet().iterator().next();
IndexRequest request = new IndexRequest(indexName);
request.source(source.replace("$time", formatInstant(randomFrom(start1, end1, start2, end2))), XContentType.JSON);
request.source(renderSource(source, randomFrom(start1, end1, start2, end2)), XContentType.JSON);

var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(indexName), metadata);
assertThat(result.getName(), equalTo(indexName));
Expand All @@ -336,7 +337,7 @@ public void testGetConcreteWriteIndex() {
.build();
// Target is a regular data stream => always resolve to the latest backing index
IndexRequest request = new IndexRequest(regularDataStream);
request.source(source.replace("$time", formatInstant(randomFrom(start1, end1, start2, end2))), XContentType.JSON);
request.source(renderSource(source, randomFrom(start1, end1, start2, end2)), XContentType.JSON);

var result = request.getConcreteWriteIndex(metadata2.getIndicesLookup().get(regularDataStream), metadata2);
assertThat(result.getName(), equalTo(backingIndex2.getIndex().getName()));
Expand All @@ -345,7 +346,16 @@ public void testGetConcreteWriteIndex() {
// provided timestamp resolves to the first backing index
IndexRequest request = new IndexRequest(tsdbDataStream);
request.opType(DocWriteRequest.OpType.CREATE);
request.source(source.replace("$time", formatInstant(start1)), XContentType.JSON);
request.source(renderSource(source, start1), XContentType.JSON);

var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata);
assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(0)));
}
{
// provided timestamp as millis since epoch resolves to the first backing index
IndexRequest request = new IndexRequest(tsdbDataStream);
request.opType(DocWriteRequest.OpType.CREATE);
request.source(source.replace("$time", "" + start1.toEpochMilli()), XContentType.JSON);

var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata);
assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(0)));
Expand All @@ -354,7 +364,7 @@ public void testGetConcreteWriteIndex() {
// provided timestamp resolves to the latest backing index
IndexRequest request = new IndexRequest(tsdbDataStream);
request.opType(DocWriteRequest.OpType.CREATE);
request.source(source.replace("$time", formatInstant(start2)), XContentType.JSON);
request.source(renderSource(source, start2), XContentType.JSON);

var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata);
assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(1)));
Expand All @@ -363,7 +373,7 @@ public void testGetConcreteWriteIndex() {
// provided timestamp resolves to no index => fail with an exception
IndexRequest request = new IndexRequest(tsdbDataStream);
request.opType(DocWriteRequest.OpType.CREATE);
request.source(source.replace("$time", formatInstant(end2)), XContentType.JSON);
request.source(renderSource(source, end2), XContentType.JSON);

var e = expectThrows(
IllegalArgumentException.class,
Expand All @@ -376,6 +386,10 @@ public void testGetConcreteWriteIndex() {
}
}

static String renderSource(String sourceTemplate, Instant instant) {
return sourceTemplate.replace("$time", "\"" + formatInstant(instant) + "\"");
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}
Expand Down

0 comments on commit d70aa28

Please sign in to comment.