Skip to content

Commit

Permalink
TSDB: Truncate timestamps for routing purposes only (#84699)
Browse files Browse the repository at this point in the history
Truncate timestamp to second precision for resolving data stream to backing index only.
Also add support for parsing timestamps in date nanos format.

Closes #83517
  • Loading branch information
martijnvg authored Mar 17, 2022
1 parent 22824e4 commit 61cd70b
Show file tree
Hide file tree
Showing 7 changed files with 443 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,34 +159,34 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
}
""";

private static final String BULK =
"""
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}
""";

public void testTsdbDataStreams() throws Exception {
// Create a template
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE);
assertOK(client().performRequest(putComposableIndexTemplateRequest));

var bulkRequest = new Request("POST", "/k8s/_bulk");
bulkRequest.setJsonEntity(
"""
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2005177954, "rx": 801479970}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2006223737, "rx": 802337279}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.2", "network": {"tx": 2012916202, "rx": 803685721}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434521831, "rx": 530575198}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434577921, "rx": 530600088}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434587694, "rx": 530604797}}}}
{"create": {}}
{"@timestamp": "$now", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}
"""
.replace("$now", formatInstant(Instant.now()))
);
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
bulkRequest.addParameter("refresh", "true");
assertOK(client().performRequest(bulkRequest));

Expand Down Expand Up @@ -245,6 +245,72 @@ public void testTsdbDataStreams() throws Exception {
assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex));
}

public void testTsdbDataStreamsNanos() throws Exception {
// Create a template
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE.replace("date", "date_nanos"));
assertOK(client().performRequest(putComposableIndexTemplateRequest));

var bulkRequest = new Request("POST", "/k8s/_bulk");
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstantNanos(Instant.now())));
bulkRequest.addParameter("refresh", "true");
assertOK(client().performRequest(bulkRequest));

var getDataStreamsRequest = new Request("GET", "/_data_stream");
var response = client().performRequest(getDataStreamsRequest);
assertOK(response);
var dataStreams = entityAsMap(response);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("1"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
assertThat(firstBackingIndex, backingIndexEqualTo("k8s", 1));

var indices = getIndex(firstBackingIndex);
var escapedBackingIndex = firstBackingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series"));
String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
assertThat(startTimeFirstBackingIndex, notNullValue());
String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
assertThat(endTimeFirstBackingIndex, notNullValue());

var rolloverRequest = new Request("POST", "/k8s/_rollover");
assertOK(client().performRequest(rolloverRequest));

response = client().performRequest(getDataStreamsRequest);
assertOK(response);
dataStreams = entityAsMap(response);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(2));
String secondBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.1.index_name");
assertThat(secondBackingIndex, backingIndexEqualTo("k8s", 2));

indices = getIndex(secondBackingIndex);
escapedBackingIndex = secondBackingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
String startTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
assertThat(startTimeSecondBackingIndex, equalTo(endTimeFirstBackingIndex));
String endTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
assertThat(endTimeSecondBackingIndex, notNullValue());

var indexRequest = new Request("POST", "/k8s/_doc");
Instant time = parseInstant(startTimeFirstBackingIndex);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex));

indexRequest = new Request("POST", "/k8s/_doc");
time = parseInstant(endTimeSecondBackingIndex).minusMillis(1);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex));
}

public void testSimulateTsdbDataStreamTemplate() throws Exception {
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE);
Expand Down Expand Up @@ -429,6 +495,10 @@ static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

static String formatInstantNanos(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME_NANOS.getName()).format(instant);
}

static Instant parseInstant(String input) {
return Instant.from(DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).parse(input));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.index.mapper.DateFieldMapper;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Locale;

public class DataStreamIndexSettingsProvider implements IndexSettingProvider {
Expand Down Expand Up @@ -59,8 +60,8 @@ public Settings getAdditionalIndexSettings(
final Instant start;
final Instant end;
if (dataStream == null || migrating) {
start = resolvedAt.minusMillis(lookAheadTime.getMillis());
end = resolvedAt.plusMillis(lookAheadTime.getMillis());
start = resolvedAt.minusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
end = resolvedAt.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
} else {
IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex());
if (currentLatestBackingIndex.getSettings().hasValue(IndexSettings.TIME_SERIES_END_TIME.getKey()) == false) {
Expand All @@ -75,9 +76,9 @@ public Settings getAdditionalIndexSettings(
}
start = IndexSettings.TIME_SERIES_END_TIME.get(currentLatestBackingIndex.getSettings());
if (start.isAfter(resolvedAt)) {
end = start.plusMillis(lookAheadTime.getMillis());
end = start.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
} else {
end = resolvedAt.plusMillis(lookAheadTime.getMillis());
end = resolvedAt.plusMillis(lookAheadTime.getMillis()).truncatedTo(ChronoUnit.SECONDS);
}
}
assert start.isBefore(end) : "data stream backing index's start time is not before end time";
Expand Down
Loading

0 comments on commit 61cd70b

Please sign in to comment.