diff --git a/docs/changelog/138929.yaml b/docs/changelog/138929.yaml new file mode 100644 index 0000000000000..13ca3917e1b21 --- /dev/null +++ b/docs/changelog/138929.yaml @@ -0,0 +1,5 @@ +pr: 138929 +summary: Do not pack non-dimension fields in TS +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java index 0cfa17f6968f3..678ff2d4c76e9 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java @@ -9,15 +9,19 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.Rounding; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.compute.lucene.LuceneSliceQueue; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.OperatorStatus; import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -25,6 +29,7 @@ import org.junit.Before; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -53,6 +58,8 @@ public void testEmpty() { .setMapping( "@timestamp", "type=date", + "project", + "type=keyword", "host", "type=keyword,time_series_dimension=true", "cpu", @@ -62,7 +69,15 @@ public void testEmpty() { run("TS empty_index | LIMIT 1").close(); } - record Doc(String host, String cluster, long timestamp, int requestCount, double cpu, ByteSizeValue memory) {} + record Doc( + Collection project, + String host, + String cluster, + long timestamp, + int requestCount, + double cpu, + ByteSizeValue memory + ) {} final List docs = new ArrayList<>(); @@ -98,6 +113,8 @@ public void populateIndex() { .setMapping( "@timestamp", "type=date", + "project", + "type=keyword", "host", "type=keyword,time_series_dimension=true", "cluster", @@ -118,6 +135,7 @@ public void populateIndex() { int numDocs = between(20, 100); docs.clear(); Map requestCounts = new HashMap<>(); + List allProjects = List.of("project-1", "project-2", "project-3"); for (int i = 0; i < numDocs; i++) { List hosts = randomSubsetOf(between(1, hostToClusters.size()), hostToClusters.keySet()); timestamp += between(1, 10) * 1000L; @@ -131,7 +149,8 @@ public void populateIndex() { }); int cpu = randomIntBetween(0, 100); ByteSizeValue memory = ByteSizeValue.ofBytes(randomIntBetween(1024, 1024 * 1024)); - docs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, cpu, memory)); + List projects = randomSubsetOf(between(1, 3), allProjects); + docs.add(new Doc(projects, host, hostToClusters.get(host), timestamp, requestCount, cpu, memory)); } } Randomness.shuffle(docs); @@ -140,6 +159,8 @@ public void populateIndex() { .setSource( "@timestamp", doc.timestamp, + "project", + doc.project, "host", doc.host, "cluster", @@ -672,7 +693,8 @@ public void testNullMetricsAreSkipped() { }); int cpu = randomIntBetween(0, 100); ByteSizeValue memory = ByteSizeValue.ofBytes(randomIntBetween(1024, 1024 * 1024)); - sparseDocs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, cpu, memory)); + String project = randomFrom("project-1", "project-2", "project-3"); + sparseDocs.add(new Doc(List.of(project), host, hostToClusters.get(host), timestamp, requestCount, cpu, memory)); } Randomness.shuffle(sparseDocs); @@ -729,4 +751,136 @@ public void testTSIDMetadataAttribute() { } } + public void testGroupByProject() { + record TimeSeries(String cluster, String host) { + + } + record Sample(int count, double sum) { + + } + Map>> buckets = new HashMap<>(); + for (Doc doc : docs) { + TimeSeries timeSeries = new TimeSeries(doc.cluster, doc.host); + buckets.compute(timeSeries, (k, v) -> { + if (v == null) { + return Tuple.tuple(new Sample(1, doc.cpu), Set.copyOf(doc.project)); + } else { + Set projects = Sets.union(v.v2(), Sets.newHashSet(doc.project)); + return Tuple.tuple(new Sample(v.v1().count + 1, v.v1().sum + doc.cpu), projects); + } + }); + } + try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY project")) { + Map countPerProject = new HashMap<>(); + Map sumOfAvgPerProject = new HashMap<>(); + for (var e : buckets.entrySet()) { + Sample sample = e.getValue().v1(); + for (String project : e.getValue().v2()) { + countPerProject.merge(project, 1, Integer::sum); + sumOfAvgPerProject.merge(project, sample.sum / sample.count, Double::sum); + } + } + List> rows = EsqlTestUtils.getValuesList(resp); + assertThat(rows, hasSize(sumOfAvgPerProject.size())); + for (List r : rows) { + String project = (String) r.get(1); + double actualAvg = (Double) r.get(0); + double expectedAvg = sumOfAvgPerProject.get(project) / countPerProject.get(project); + assertThat(actualAvg, closeTo(expectedAvg, 0.5)); + } + } + try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY project, cluster")) { + record Key(String project, String cluster) { + + } + Map countPerProject = new HashMap<>(); + Map sumOfAvgPerProject = new HashMap<>(); + for (var e : buckets.entrySet()) { + Sample sample = e.getValue().v1(); + for (String project : e.getValue().v2()) { + Key key = new Key(project, e.getKey().cluster); + countPerProject.merge(key, 1, Integer::sum); + sumOfAvgPerProject.merge(key, sample.sum / sample.count, Double::sum); + } + } + List> rows = EsqlTestUtils.getValuesList(resp); + assertThat(rows, hasSize(sumOfAvgPerProject.size())); + for (List r : rows) { + Key key = new Key((String) r.get(1), (String) r.get(2)); + double actualAvg = (Double) r.get(0); + double expectedAvg = sumOfAvgPerProject.get(key) / countPerProject.get(key); + assertThat(actualAvg, closeTo(expectedAvg, 0.5)); + } + } + } + + public void testGroupByProjectAndTBucket() { + record TimeSeries(String cluster, String host, String tbucket) { + + } + record Sample(int count, double sum) { + + } + Map>> buckets = new HashMap<>(); + var rounding = new Rounding.Builder(TimeValue.timeValueMillis(TimeValue.timeValueMinutes(1).millis())).build().prepareForUnknown(); + for (Doc doc : docs) { + var tbucket = DEFAULT_DATE_TIME_FORMATTER.formatMillis(rounding.round(doc.timestamp)); + TimeSeries timeSeries = new TimeSeries(doc.cluster, doc.host, tbucket); + buckets.compute(timeSeries, (k, v) -> { + if (v == null) { + return Tuple.tuple(new Sample(1, doc.cpu), Set.copyOf(doc.project)); + } else { + Set projects = Sets.union(v.v2(), Sets.newHashSet(doc.project)); + return Tuple.tuple(new Sample(v.v1().count + 1, v.v1().sum + doc.cpu), projects); + } + }); + } + try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY project, tbucket(1minute)")) { + record Key(String project, String tbucket) { + + } + Map countPerProject = new HashMap<>(); + Map sumOfAvgPerProject = new HashMap<>(); + for (var e : buckets.entrySet()) { + Sample sample = e.getValue().v1(); + for (String project : e.getValue().v2()) { + Key key = new Key(project, e.getKey().tbucket); + countPerProject.merge(key, 1, Integer::sum); + sumOfAvgPerProject.merge(key, sample.sum / sample.count, Double::sum); + } + } + List> rows = EsqlTestUtils.getValuesList(resp); + assertThat(rows, hasSize(sumOfAvgPerProject.size())); + for (List r : rows) { + Key key = new Key((String) r.get(1), (String) r.get(2)); + double actualAvg = (Double) r.get(0); + double expectedAvg = sumOfAvgPerProject.get(key) / countPerProject.get(key); + assertThat(actualAvg, closeTo(expectedAvg, 0.5)); + } + } + try (var resp = run("TS host* | STATS avg(avg_over_time(cpu)) BY tbucket(1minute), project, cluster")) { + record Key(String project, String cluster, String tbucket) { + + } + Map countPerProject = new HashMap<>(); + Map sumOfAvgPerProject = new HashMap<>(); + for (var e : buckets.entrySet()) { + Sample sample = e.getValue().v1(); + for (String project : e.getValue().v2()) { + Key key = new Key(project, e.getKey().cluster, e.getKey().tbucket); + countPerProject.merge(key, 1, Integer::sum); + sumOfAvgPerProject.merge(key, sample.sum / sample.count, Double::sum); + } + } + List> rows = EsqlTestUtils.getValuesList(resp); + assertThat(rows, hasSize(sumOfAvgPerProject.size())); + for (List r : rows) { + Key key = new Key((String) r.get(2), (String) r.get(3), (String) r.get(1)); + double actualAvg = (Double) r.get(0); + double expectedAvg = sumOfAvgPerProject.get(key) / countPerProject.get(key); + assertThat(actualAvg, closeTo(expectedAvg, 0.5)); + } + } + } + } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java index 9f3f93aa07d6b..337f83a3dc6e4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java @@ -300,7 +300,9 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex } }); NamedExpression timeBucket = timeBucketRef.get(); - for (var group : aggregate.groupings()) { + boolean[] packPositions = new boolean[aggregate.groupings().size()]; + for (int i = 0; i < aggregate.groupings().size(); i++) { + var group = aggregate.groupings().get(i); if (group instanceof Attribute == false) { throw new EsqlIllegalArgumentException("expected named expression for grouping; got " + group); } @@ -312,21 +314,26 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex } else { var valuesAgg = new Alias(g.source(), g.name(), valuesAggregate(context, g)); firstPassAggs.add(valuesAgg); - Alias pack = new Alias( - g.source(), - internalNames.next("pack" + g.name()), - new PackDimension(g.source(), valuesAgg.toAttribute()) - ); - packDimensions.add(pack); - Alias grouping = new Alias(g.source(), internalNames.next("group" + g.name()), pack.toAttribute()); - secondPassGroupings.add(grouping); - Alias unpack = new Alias( - g.source(), - g.name(), - new UnpackDimension(g.source(), grouping.toAttribute(), g.dataType().noText()), - g.id() - ); - unpackDimensions.add(unpack); + if (g.isDimension()) { + Alias pack = new Alias( + g.source(), + internalNames.next("pack_" + g.name()), + new PackDimension(g.source(), valuesAgg.toAttribute()) + ); + packDimensions.add(pack); + Alias grouping = new Alias(g.source(), internalNames.next("group_" + g.name()), pack.toAttribute()); + secondPassGroupings.add(grouping); + Alias unpack = new Alias( + g.source(), + g.name(), + new UnpackDimension(g.source(), grouping.toAttribute(), g.dataType().noText()), + g.id() + ); + unpackDimensions.add(unpack); + packPositions[i] = true; + } else { + secondPassGroupings.add(new Alias(g.source(), g.name(), valuesAgg.toAttribute(), g.id())); + } } } LogicalPlan newChild = aggregate.child().transformUp(EsRelation.class, r -> { @@ -365,13 +372,12 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex for (NamedExpression agg : secondPassAggs) { projects.add(Expressions.attribute(agg)); } - int pos = 0; - for (Expression group : secondPassGroupings) { - Attribute g = Expressions.attribute(group); - if (timeBucket != null && g.id().equals(timeBucket.id())) { - projects.add(g); + int packPos = 0; + for (int i = 0; i < secondPassGroupings.size(); i++) { + if (packPositions[i]) { + projects.add(unpackDimensions.get(packPos++).toAttribute()); } else { - projects.add(unpackDimensions.get(pos++).toAttribute()); + projects.add(Expressions.attribute(secondPassGroupings.get(i))); } } return new Project(newChild.source(), unpackValues, projects);