Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/138929.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 138929
summary: Do not pack non-dimension fields in TS
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,27 @@

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.compute.lucene.LuceneSliceQueue;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.OperatorStatus;
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.hamcrest.Matchers;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -53,6 +58,8 @@ public void testEmpty() {
.setMapping(
"@timestamp",
"type=date",
"project",
"type=keyword",
"host",
"type=keyword,time_series_dimension=true",
"cpu",
Expand All @@ -62,7 +69,15 @@ public void testEmpty() {
run("TS empty_index | LIMIT 1").close();
}

record Doc(String host, String cluster, long timestamp, int requestCount, double cpu, ByteSizeValue memory) {}
record Doc(
Collection<String> project,
String host,
String cluster,
long timestamp,
int requestCount,
double cpu,
ByteSizeValue memory
) {}

final List<Doc> docs = new ArrayList<>();

Expand Down Expand Up @@ -98,6 +113,8 @@ public void populateIndex() {
.setMapping(
"@timestamp",
"type=date",
"project",
"type=keyword",
"host",
"type=keyword,time_series_dimension=true",
"cluster",
Expand All @@ -118,6 +135,7 @@ public void populateIndex() {
int numDocs = between(20, 100);
docs.clear();
Map<String, Integer> requestCounts = new HashMap<>();
List<String> allProjects = List.of("project-1", "project-2", "project-3");
for (int i = 0; i < numDocs; i++) {
List<String> hosts = randomSubsetOf(between(1, hostToClusters.size()), hostToClusters.keySet());
timestamp += between(1, 10) * 1000L;
Expand All @@ -131,7 +149,8 @@ public void populateIndex() {
});
int cpu = randomIntBetween(0, 100);
ByteSizeValue memory = ByteSizeValue.ofBytes(randomIntBetween(1024, 1024 * 1024));
docs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, cpu, memory));
List<String> projects = randomSubsetOf(between(1, 3), allProjects);
docs.add(new Doc(projects, host, hostToClusters.get(host), timestamp, requestCount, cpu, memory));
}
}
Randomness.shuffle(docs);
Expand All @@ -140,6 +159,8 @@ public void populateIndex() {
.setSource(
"@timestamp",
doc.timestamp,
"project",
doc.project,
"host",
doc.host,
"cluster",
Expand Down Expand Up @@ -672,7 +693,8 @@ public void testNullMetricsAreSkipped() {
});
int cpu = randomIntBetween(0, 100);
ByteSizeValue memory = ByteSizeValue.ofBytes(randomIntBetween(1024, 1024 * 1024));
sparseDocs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, cpu, memory));
String project = randomFrom("project-1", "project-2", "project-3");
sparseDocs.add(new Doc(List.of(project), host, hostToClusters.get(host), timestamp, requestCount, cpu, memory));
}

Randomness.shuffle(sparseDocs);
Expand Down Expand Up @@ -729,4 +751,136 @@ public void testTSIDMetadataAttribute() {
}
}

public void testGroupByProject() {
record TimeSeries(String cluster, String host) {

}
record Sample(int count, double sum) {

}
Map<TimeSeries, Tuple<Sample, Set<String>>> buckets = new HashMap<>();
for (Doc doc : docs) {
TimeSeries timeSeries = new TimeSeries(doc.cluster, doc.host);
buckets.compute(timeSeries, (k, v) -> {
if (v == null) {
return Tuple.tuple(new Sample(1, doc.cpu), Set.copyOf(doc.project));
} else {
Set<String> projects = Sets.union(v.v2(), Sets.newHashSet(doc.project));
return Tuple.tuple(new Sample(v.v1().count + 1, v.v1().sum + doc.cpu), projects);
}
});
}
try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY project")) {
Map<String, Integer> countPerProject = new HashMap<>();
Map<String, Double> sumOfAvgPerProject = new HashMap<>();
for (var e : buckets.entrySet()) {
Sample sample = e.getValue().v1();
for (String project : e.getValue().v2()) {
countPerProject.merge(project, 1, Integer::sum);
sumOfAvgPerProject.merge(project, sample.sum / sample.count, Double::sum);
}
}
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows, hasSize(sumOfAvgPerProject.size()));
for (List<Object> r : rows) {
String project = (String) r.get(1);
double actualAvg = (Double) r.get(0);
double expectedAvg = sumOfAvgPerProject.get(project) / countPerProject.get(project);
assertThat(actualAvg, closeTo(expectedAvg, 0.5));
}
}
try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY project, cluster")) {
record Key(String project, String cluster) {

}
Map<Key, Integer> countPerProject = new HashMap<>();
Map<Key, Double> sumOfAvgPerProject = new HashMap<>();
for (var e : buckets.entrySet()) {
Sample sample = e.getValue().v1();
for (String project : e.getValue().v2()) {
Key key = new Key(project, e.getKey().cluster);
countPerProject.merge(key, 1, Integer::sum);
sumOfAvgPerProject.merge(key, sample.sum / sample.count, Double::sum);
}
}
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows, hasSize(sumOfAvgPerProject.size()));
for (List<Object> r : rows) {
Key key = new Key((String) r.get(1), (String) r.get(2));
double actualAvg = (Double) r.get(0);
double expectedAvg = sumOfAvgPerProject.get(key) / countPerProject.get(key);
assertThat(actualAvg, closeTo(expectedAvg, 0.5));
}
}
}

public void testGroupByProjectAndTBucket() {
record TimeSeries(String cluster, String host, String tbucket) {

}
record Sample(int count, double sum) {

}
Map<TimeSeries, Tuple<Sample, Set<String>>> buckets = new HashMap<>();
var rounding = new Rounding.Builder(TimeValue.timeValueMillis(TimeValue.timeValueMinutes(1).millis())).build().prepareForUnknown();
for (Doc doc : docs) {
var tbucket = DEFAULT_DATE_TIME_FORMATTER.formatMillis(rounding.round(doc.timestamp));
TimeSeries timeSeries = new TimeSeries(doc.cluster, doc.host, tbucket);
buckets.compute(timeSeries, (k, v) -> {
if (v == null) {
return Tuple.tuple(new Sample(1, doc.cpu), Set.copyOf(doc.project));
} else {
Set<String> projects = Sets.union(v.v2(), Sets.newHashSet(doc.project));
return Tuple.tuple(new Sample(v.v1().count + 1, v.v1().sum + doc.cpu), projects);
}
});
}
try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY project, tbucket(1minute)")) {
record Key(String project, String tbucket) {

}
Map<Key, Integer> countPerProject = new HashMap<>();
Map<Key, Double> sumOfAvgPerProject = new HashMap<>();
for (var e : buckets.entrySet()) {
Sample sample = e.getValue().v1();
for (String project : e.getValue().v2()) {
Key key = new Key(project, e.getKey().tbucket);
countPerProject.merge(key, 1, Integer::sum);
sumOfAvgPerProject.merge(key, sample.sum / sample.count, Double::sum);
}
}
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows, hasSize(sumOfAvgPerProject.size()));
for (List<Object> r : rows) {
Key key = new Key((String) r.get(1), (String) r.get(2));
double actualAvg = (Double) r.get(0);
double expectedAvg = sumOfAvgPerProject.get(key) / countPerProject.get(key);
assertThat(actualAvg, closeTo(expectedAvg, 0.5));
}
}
try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY tbucket(1minute), project, cluster")) {
record Key(String project, String cluster, String tbucket) {

}
Map<Key, Integer> countPerProject = new HashMap<>();
Map<Key, Double> sumOfAvgPerProject = new HashMap<>();
for (var e : buckets.entrySet()) {
Sample sample = e.getValue().v1();
for (String project : e.getValue().v2()) {
Key key = new Key(project, e.getKey().cluster, e.getKey().tbucket);
countPerProject.merge(key, 1, Integer::sum);
sumOfAvgPerProject.merge(key, sample.sum / sample.count, Double::sum);
}
}
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows, hasSize(sumOfAvgPerProject.size()));
for (List<Object> r : rows) {
Key key = new Key((String) r.get(2), (String) r.get(3), (String) r.get(1));
double actualAvg = (Double) r.get(0);
double expectedAvg = sumOfAvgPerProject.get(key) / countPerProject.get(key);
assertThat(actualAvg, closeTo(expectedAvg, 0.5));
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a test with grouping on a multi-value label (non-dimension)? It'll unroll, this is ok.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I will make the project a multi-valued field.


}
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,9 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex
}
});
NamedExpression timeBucket = timeBucketRef.get();
for (var group : aggregate.groupings()) {
boolean[] packPositions = new boolean[aggregate.groupings().size()];
for (int i = 0; i < aggregate.groupings().size(); i++) {
var group = aggregate.groupings().get(i);
if (group instanceof Attribute == false) {
throw new EsqlIllegalArgumentException("expected named expression for grouping; got " + group);
}
Expand All @@ -312,21 +314,26 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex
} else {
var valuesAgg = new Alias(g.source(), g.name(), valuesAggregate(context, g));
firstPassAggs.add(valuesAgg);
Alias pack = new Alias(
g.source(),
internalNames.next("pack" + g.name()),
new PackDimension(g.source(), valuesAgg.toAttribute())
);
packDimensions.add(pack);
Alias grouping = new Alias(g.source(), internalNames.next("group" + g.name()), pack.toAttribute());
secondPassGroupings.add(grouping);
Alias unpack = new Alias(
g.source(),
g.name(),
new UnpackDimension(g.source(), grouping.toAttribute(), g.dataType().noText()),
g.id()
);
unpackDimensions.add(unpack);
if (g.isDimension()) {
Alias pack = new Alias(
g.source(),
internalNames.next("pack_" + g.name()),
new PackDimension(g.source(), valuesAgg.toAttribute())
);
packDimensions.add(pack);
Alias grouping = new Alias(g.source(), internalNames.next("group_" + g.name()), pack.toAttribute());
secondPassGroupings.add(grouping);
Alias unpack = new Alias(
g.source(),
g.name(),
new UnpackDimension(g.source(), grouping.toAttribute(), g.dataType().noText()),
g.id()
);
unpackDimensions.add(unpack);
packPositions[i] = true;
} else {
secondPassGroupings.add(new Alias(g.source(), g.name(), valuesAgg.toAttribute(), g.id()));
}
}
}
LogicalPlan newChild = aggregate.child().transformUp(EsRelation.class, r -> {
Expand Down Expand Up @@ -365,13 +372,12 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex
for (NamedExpression agg : secondPassAggs) {
projects.add(Expressions.attribute(agg));
}
int pos = 0;
for (Expression group : secondPassGroupings) {
Attribute g = Expressions.attribute(group);
if (timeBucket != null && g.id().equals(timeBucket.id())) {
projects.add(g);
int packPos = 0;
for (int i = 0; i < secondPassGroupings.size(); i++) {
if (packPositions[i]) {
projects.add(unpackDimensions.get(packPos++).toAttribute());
} else {
projects.add(unpackDimensions.get(pos++).toAttribute());
projects.add(Expressions.attribute(secondPassGroupings.get(i)));
}
}
return new Project(newChild.source(), unpackValues, projects);
Expand Down