Skip to content

Commit

Permalink
TSDB: Basic rolling upgrade test (#78028)
Browse files Browse the repository at this point in the history
Adds an index in time_series mode to the rolling upgrade tests.
  • Loading branch information
nik9000 authored Oct 1, 2021
1 parent 9dda447 commit 37a23d7
Showing 1 changed file with 144 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,33 @@
*/
package org.elasticsearch.upgrades;

import io.github.nik9000.mapmatcher.ListMatcher;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.hamcrest.Matcher;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static io.github.nik9000.mapmatcher.ListMatcher.matchesList;
import static io.github.nik9000.mapmatcher.MapMatcher.assertMap;
import static io.github.nik9000.mapmatcher.MapMatcher.matchesMap;
import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -228,12 +238,144 @@ private void bulk(String index, String valueSuffix, int count) throws IOExceptio
b.append("{\"index\": {\"_index\": \"").append(index).append("\"}}\n");
b.append("{\"f1\": \"v").append(i).append(valueSuffix).append("\", \"f2\": ").append(i).append("}\n");
}
bulk(index, b.toString());
}

private static final List<String> TSDB_DIMS = List.of("6a841a21", "947e4ced", "a4c385a1", "b47a2f4e", "df3145b3");
private static final long[] TSDB_TIMES;
static {
String[] times = new String[] {
"2021-01-01T00:00:00Z",
"2021-01-02T00:00:00Z",
"2021-01-02T00:10:00Z",
"2021-01-02T00:20:00Z",
"2021-01-02T00:30:00Z" };
TSDB_TIMES = new long[times.length];
for (int i = 0; i < times.length; i++) {
TSDB_TIMES[i] = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis(times[i]);
}
}

public void testTsdb() throws IOException {
assumeTrue("tsdb added in 8.0.0", UPGRADE_FROM_VERSION.onOrAfter(Version.V_8_0_0));

StringBuilder bulk = new StringBuilder();
switch (CLUSTER_TYPE) {
case OLD:
createTsdbIndex();
tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[0], TSDB_TIMES[1], 0.1);
tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[0], TSDB_TIMES[1], -0.1);
bulk("tsdb", bulk.toString());
assertTsdbAgg(closeTo(215.95, 0.005), closeTo(-215.95, 0.005));
return;
case MIXED:
if (FIRST_MIXED_ROUND) {
tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[1], TSDB_TIMES[2], 0.1);
tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[1], TSDB_TIMES[2], -0.1);
tsdbBulk(bulk, TSDB_DIMS.get(2), TSDB_TIMES[0], TSDB_TIMES[2], 1.1);
bulk("tsdb", bulk.toString());
assertTsdbAgg(closeTo(217.45, 0.005), closeTo(-217.45, 0.005), closeTo(2391.95, 0.005));
return;
}
tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[2], TSDB_TIMES[3], 0.1);
tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[2], TSDB_TIMES[3], -0.1);
tsdbBulk(bulk, TSDB_DIMS.get(2), TSDB_TIMES[2], TSDB_TIMES[3], 1.1);
tsdbBulk(bulk, TSDB_DIMS.get(3), TSDB_TIMES[0], TSDB_TIMES[3], 10);
bulk("tsdb", bulk.toString());
assertTsdbAgg(closeTo(218.95, 0.005), closeTo(-218.95, 0.005), closeTo(2408.45, 0.005), closeTo(21895, 0.5));
return;
case UPGRADED:
tsdbBulk(bulk, TSDB_DIMS.get(0), TSDB_TIMES[3], TSDB_TIMES[4], 0.1);
tsdbBulk(bulk, TSDB_DIMS.get(1), TSDB_TIMES[3], TSDB_TIMES[4], -0.1);
tsdbBulk(bulk, TSDB_DIMS.get(2), TSDB_TIMES[3], TSDB_TIMES[4], 1.1);
tsdbBulk(bulk, TSDB_DIMS.get(3), TSDB_TIMES[3], TSDB_TIMES[4], 10);
tsdbBulk(bulk, TSDB_DIMS.get(4), TSDB_TIMES[0], TSDB_TIMES[4], -5);
bulk("tsdb", bulk.toString());
assertTsdbAgg(
closeTo(220.45, 0.005),
closeTo(-220.45, 0.005),
closeTo(2424.95, 0.005),
closeTo(22045, 0.5),
closeTo(-11022.5, 0.5)
);
return;
}
}

private void bulk(String index, String entity) throws IOException {
Request bulk = new Request("POST", "/_bulk");
bulk.addParameter("refresh", "true");
bulk.setJsonEntity(b.toString());
bulk.setJsonEntity(entity.toString());
client().performRequest(bulk);
}

private void createTsdbIndex() throws IOException {
Request createIndex = new Request("PUT", "/tsdb");
XContentBuilder indexSpec = XContentBuilder.builder(XContentType.JSON.xContent()).startObject();
indexSpec.startObject("mappings").startObject("properties");
{
indexSpec.startObject("@timestamp").field("type", "date").endObject();
indexSpec.startObject("dim").field("type", "keyword").field("time_series_dimension", true).endObject();
}
indexSpec.endObject().endObject();
indexSpec.startObject("settings").field("mode", "time_series").endObject();
createIndex.setJsonEntity(Strings.toString(indexSpec.endObject()));
client().performRequest(createIndex);
}

private void tsdbBulk(StringBuilder bulk, String dim, long timeStart, long timeEnd, double rate) throws IOException {
long delta = TimeUnit.SECONDS.toMillis(20);
double value = (timeStart - TSDB_TIMES[0]) / TimeUnit.SECONDS.toMillis(20) * rate;
for (long t = timeStart; t < timeEnd; t += delta) {
bulk.append("{\"index\": {\"_index\": \"tsdb\"}}\n");
bulk.append("{\"@timestamp\": ").append(t);
bulk.append(", \"dim\": \"").append(dim).append("\"");
bulk.append(", \"value\": ").append(value).append("}\n");
value += rate;
}
}

private void assertTsdbAgg(Matcher<?>... expected) throws IOException {
Request request = new Request("POST", "/tsdb/_search");
request.addParameter("size", "0");
XContentBuilder body = JsonXContent.contentBuilder().startObject();
// TODO replace tsid runtime field with real tsid
body.startObject("runtime_mappings");
{
body.startObject("tsid");
{
body.field("type", "keyword");
body.field("script", "emit('dim:' + doc['dim'].value)");
}
body.endObject();
}
body.endObject();
body.startObject("aggs").startObject("tsids");
{
body.startObject("terms").field("field", "tsid").endObject();
body.startObject("aggs").startObject("avg");
{
body.startObject("avg").field("field", "value").endObject();
}
body.endObject().endObject();
}
body.endObject().endObject();
request.setJsonEntity(Strings.toString(body.endObject()));
ListMatcher tsidsExpected = matchesList();
for (int d = 0; d < expected.length; d++) {
// Object key = Map.of("dim", TSDB_DIMS.get(d)); TODO use this once tsid is real
Object key = "dim:" + TSDB_DIMS.get(d);
tsidsExpected = tsidsExpected.item(
matchesMap().extraOk().entry("key", key).entry("avg", Map.of("value", expected[d]))
);
}
assertMap(
entityAsMap(client().performRequest(request)),
matchesMap().extraOk()
.entry("aggregations", matchesMap().entry("tsids", matchesMap().extraOk().entry("buckets", tsidsExpected)))
);
}

private void assertCount(String index, int count) throws IOException {
Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search");
searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
Expand Down

0 comments on commit 37a23d7

Please sign in to comment.