diff --git a/docs/changelog/116755.yaml b/docs/changelog/116755.yaml new file mode 100644 index 0000000000000..3aa5ec8580b59 --- /dev/null +++ b/docs/changelog/116755.yaml @@ -0,0 +1,5 @@ +pr: 116755 +summary: Smarter field caps with subscribable listener +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/RequestIndexFilteringIT.java b/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/RequestIndexFilteringIT.java new file mode 100644 index 0000000000000..c2ba502b92554 --- /dev/null +++ b/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/RequestIndexFilteringIT.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.multi_node; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.xpack.esql.qa.rest.RequestIndexFilteringTestCase; +import org.junit.ClassRule; + +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase { + + @ClassRule + public static ElasticsearchCluster cluster = Clusters.testCluster(ignored -> {}); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } +} diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RequestIndexFilteringIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RequestIndexFilteringIT.java new file mode 100644 index 0000000000000..f13bcd618f0a8 --- /dev/null +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RequestIndexFilteringIT.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.single_node; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.xpack.esql.qa.rest.RequestIndexFilteringTestCase; +import org.junit.ClassRule; + +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase { + + @ClassRule + public static ElasticsearchCluster cluster = Clusters.testCluster(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } +} diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java new file mode 100644 index 0000000000000..3314430d63eaa --- /dev/null +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java @@ -0,0 +1,284 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.rest; + +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.esql.AssertWarnings; +import org.junit.After; +import org.junit.Assert; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.test.ListMatcher.matchesList; +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.entityToMap; +import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.requestObjectBuilder; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.nullValue; + +public abstract class RequestIndexFilteringTestCase extends ESRestTestCase { + + @After + public void wipeTestData() throws IOException { + try { + var response = client().performRequest(new Request("DELETE", "/test*")); + assertEquals(200, response.getStatusLine().getStatusCode()); + } catch (ResponseException re) { + assertEquals(404, re.getResponse().getStatusLine().getStatusCode()); + } + } + + public void testTimestampFilterFromQuery() throws IOException { + int docsTest1 = 50; + int docsTest2 = 30; + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + indexTimestampData(docsTest2, "test2", "2023-11-26", "id2"); + + // filter includes both indices in the result (all columns, all rows) + RestEsqlTestCase.RequestObjectBuilder builder = timestampFilter("gte", "2023-01-01").query("FROM test*"); + Map result = runEsql(builder); + assertMap( + result, + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + .item(matchesMap().entry("name", "id2").entry("type", "integer")) + .item(matchesMap().entry("name", "value").entry("type", "long")) + ).entry("values", allOf(instanceOf(List.class), hasSize(docsTest1 + docsTest2))).entry("took", greaterThanOrEqualTo(0)) + ); + + // filter includes only test1. Columns from test2 are filtered out, as well (not only rows)! + builder = timestampFilter("gte", "2024-01-01").query("FROM test*"); + assertMap( + runEsql(builder), + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + .item(matchesMap().entry("name", "value").entry("type", "long")) + ).entry("values", allOf(instanceOf(List.class), hasSize(docsTest1))).entry("took", greaterThanOrEqualTo(0)) + ); + + // filter excludes both indices (no rows); the first analysis step fails because there are no columns, a second attempt succeeds + // after eliminating the index filter. All columns are returned. + builder = timestampFilter("gte", "2025-01-01").query("FROM test*"); + assertMap( + runEsql(builder), + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + .item(matchesMap().entry("name", "id2").entry("type", "integer")) + .item(matchesMap().entry("name", "value").entry("type", "long")) + ).entry("values", allOf(instanceOf(List.class), hasSize(0))).entry("took", greaterThanOrEqualTo(0)) + ); + } + + public void testFieldExistsFilter_KeepWildcard() throws IOException { + int docsTest1 = randomIntBetween(0, 10); + int docsTest2 = randomIntBetween(0, 10); + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + indexTimestampData(docsTest2, "test2", "2023-11-26", "id2"); + + // filter includes only test1. Columns are rows of test2 are filtered out + RestEsqlTestCase.RequestObjectBuilder builder = existsFilter("id1").query("FROM test*"); + Map result = runEsql(builder); + assertMap( + result, + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + .item(matchesMap().entry("name", "value").entry("type", "long")) + ).entry("values", allOf(instanceOf(List.class), hasSize(docsTest1))).entry("took", greaterThanOrEqualTo(0)) + ); + + // filter includes only test1. Columns from test2 are filtered out, as well (not only rows)! + builder = existsFilter("id1").query("FROM test* METADATA _index | KEEP _index, id*"); + result = runEsql(builder); + assertMap( + result, + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "_index").entry("type", "keyword")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + ).entry("values", allOf(instanceOf(List.class), hasSize(docsTest1))).entry("took", greaterThanOrEqualTo(0)) + ); + @SuppressWarnings("unchecked") + var values = (List>) result.get("values"); + for (List row : values) { + assertThat(row.get(0), equalTo("test1")); + assertThat(row.get(1), instanceOf(Integer.class)); + } + } + + public void testFieldExistsFilter_With_ExplicitUseOfDiscardedIndexFields() throws IOException { + int docsTest1 = randomIntBetween(1, 5); + int docsTest2 = randomIntBetween(0, 5); + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + indexTimestampData(docsTest2, "test2", "2023-11-26", "id2"); + + // test2 is explicitly used in a query with "SORT id2" even if the index filter should discard test2 + RestEsqlTestCase.RequestObjectBuilder builder = existsFilter("id1").query( + "FROM test* METADATA _index | SORT id2 | KEEP _index, id*" + ); + Map result = runEsql(builder); + assertMap( + result, + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "_index").entry("type", "keyword")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + .item(matchesMap().entry("name", "id2").entry("type", "integer")) + ).entry("values", allOf(instanceOf(List.class), hasSize(docsTest1))).entry("took", greaterThanOrEqualTo(0)) + ); + @SuppressWarnings("unchecked") + var values = (List>) result.get("values"); + for (List row : values) { + assertThat(row.get(0), equalTo("test1")); + assertThat(row.get(1), instanceOf(Integer.class)); + assertThat(row.get(2), nullValue()); + } + } + + public void testFieldNameTypo() throws IOException { + int docsTest1 = randomIntBetween(0, 5); + int docsTest2 = randomIntBetween(0, 5); + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + indexTimestampData(docsTest2, "test2", "2023-11-26", "id2"); + + // idx field name is explicitly used, though it doesn't exist in any of the indices. First test - without filter + ResponseException e = expectThrows( + ResponseException.class, + () -> runEsql(requestObjectBuilder().query("FROM test* | WHERE idx == 123")) + ); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("verification_exception")); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]")); + + e = expectThrows(ResponseException.class, () -> runEsql(requestObjectBuilder().query("FROM test1 | WHERE idx == 123"))); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("verification_exception")); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]")); + + e = expectThrows( + ResponseException.class, + () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM test* | WHERE idx == 123")) + ); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]")); + + e = expectThrows( + ResponseException.class, + () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM test2 | WHERE idx == 123")) + ); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]")); + } + + public void testIndicesDontExist() throws IOException { + int docsTest1 = 0; // we are interested only in the created index, not necessarily that it has data + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + + ResponseException e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo"))); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("verification_exception")); + assertThat(e.getMessage(), containsString("Unknown index [foo]")); + + e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo*"))); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("verification_exception")); + assertThat(e.getMessage(), containsString("Unknown index [foo*]")); + + e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo,test1"))); + assertEquals(404, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("index_not_found_exception")); + assertThat(e.getMessage(), containsString("no such index [foo]")); + } + + private static RestEsqlTestCase.RequestObjectBuilder timestampFilter(String op, String date) throws IOException { + return requestObjectBuilder().filter(b -> { + b.startObject("range"); + { + b.startObject("@timestamp").field(op, date).endObject(); + } + b.endObject(); + }); + } + + private static RestEsqlTestCase.RequestObjectBuilder existsFilter(String field) throws IOException { + return requestObjectBuilder().filter(b -> b.startObject("exists").field("field", field).endObject()); + } + + public Map runEsql(RestEsqlTestCase.RequestObjectBuilder requestObject) throws IOException { + return RestEsqlTestCase.runEsql(requestObject, new AssertWarnings.NoWarnings(), RestEsqlTestCase.Mode.SYNC); + } + + protected void indexTimestampData(int docs, String indexName, String date, String differentiatorFieldName) throws IOException { + Request createIndex = new Request("PUT", indexName); + createIndex.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 3 + } + }, + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "%differentiator_field_name%": { + "type": "integer" + } + } + } + }""".replace("%differentiator_field_name%", differentiatorFieldName)); + Response response = client().performRequest(createIndex); + assertThat( + entityToMap(response.getEntity(), XContentType.JSON), + matchesMap().entry("shards_acknowledged", true).entry("index", indexName).entry("acknowledged", true) + ); + + if (docs > 0) { + StringBuilder b = new StringBuilder(); + for (int i = 0; i < docs; i++) { + b.append(String.format(Locale.ROOT, """ + {"create":{"_index":"%s"}} + {"@timestamp":"%s","value":%d,"%s":%d} + """, indexName, date, i, differentiatorFieldName, i)); + } + Request bulk = new Request("POST", "/_bulk"); + bulk.addParameter("refresh", "true"); + bulk.addParameter("filter_path", "errors"); + bulk.setJsonEntity(b.toString()); + response = client().performRequest(bulk); + Assert.assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + } + } +} diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEnrichTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEnrichTestCase.java index def6491fb920f..bf4a4400e13cf 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEnrichTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEnrichTestCase.java @@ -12,7 +12,9 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentBuilder; import org.junit.After; import org.junit.Before; @@ -29,7 +31,6 @@ public abstract class RestEnrichTestCase extends ESRestTestCase { private static final String sourceIndexName = "countries"; - private static final String testIndexName = "test"; private static final String policyName = "countries"; public enum Mode { @@ -56,7 +57,7 @@ public void assertRequestBreakerEmpty() throws Exception { @Before public void loadTestData() throws IOException { - Request request = new Request("PUT", "/" + testIndexName); + Request request = new Request("PUT", "/test1"); request.setJsonEntity(""" { "mappings": { @@ -72,7 +73,7 @@ public void loadTestData() throws IOException { }"""); assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); - request = new Request("POST", "/" + testIndexName + "/_bulk"); + request = new Request("POST", "/test1/_bulk"); request.addParameter("refresh", "true"); request.setJsonEntity(""" { "index": {"_id": 1} } @@ -84,6 +85,34 @@ public void loadTestData() throws IOException { """); assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + request = new Request("PUT", "/test2"); + request.setJsonEntity(""" + { + "mappings": { + "properties": { + "geo.dest": { + "type": "keyword" + }, + "country_number": { + "type": "long" + } + } + } + }"""); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + + request = new Request("POST", "/test2/_bulk"); + request.addParameter("refresh", "true"); + request.setJsonEntity(""" + { "index": {"_id": 1} } + { "geo.dest": "IN", "country_number": 2 } + { "index": {"_id": 2} } + { "geo.dest": "IN", "country_number": 2 } + { "index": {"_id": 3} } + { "geo.dest": "US", "country_number": 3 } + """); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + request = new Request("PUT", "/" + sourceIndexName); request.setJsonEntity(""" { @@ -131,7 +160,7 @@ public void loadTestData() throws IOException { @After public void wipeTestData() throws IOException { try { - var response = client().performRequest(new Request("DELETE", "/" + testIndexName)); + var response = client().performRequest(new Request("DELETE", "/test1,test2")); assertEquals(200, response.getStatusLine().getStatusCode()); response = client().performRequest(new Request("DELETE", "/" + sourceIndexName)); assertEquals(200, response.getStatusLine().getStatusCode()); @@ -143,7 +172,7 @@ public void wipeTestData() throws IOException { } public void testNonExistentEnrichPolicy() throws IOException { - ResponseException re = expectThrows(ResponseException.class, () -> runEsql("from test | enrich countris", Mode.SYNC)); + ResponseException re = expectThrows(ResponseException.class, () -> runEsql("from test1 | enrich countris", null, Mode.SYNC)); assertThat( EntityUtils.toString(re.getResponse().getEntity()), containsString("cannot find enrich policy [countris], did you mean [countries]?") @@ -151,7 +180,10 @@ public void testNonExistentEnrichPolicy() throws IOException { } public void testNonExistentEnrichPolicy_KeepField() throws IOException { - ResponseException re = expectThrows(ResponseException.class, () -> runEsql("from test | enrich countris | keep number", Mode.SYNC)); + ResponseException re = expectThrows( + ResponseException.class, + () -> runEsql("from test1 | enrich countris | keep number", null, Mode.SYNC) + ); assertThat( EntityUtils.toString(re.getResponse().getEntity()), containsString("cannot find enrich policy [countris], did you mean [countries]?") @@ -159,25 +191,147 @@ public void testNonExistentEnrichPolicy_KeepField() throws IOException { } public void testMatchField_ImplicitFieldsList() throws IOException { - Map result = runEsql("from test | enrich countries | keep number | sort number"); + Map result = runEsql("from test1 | enrich countries | keep number | sort number"); var columns = List.of(Map.of("name", "number", "type", "long")); var values = List.of(List.of(1000), List.of(1000), List.of(5000)); assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); } public void testMatchField_ImplicitFieldsList_WithStats() throws IOException { - Map result = runEsql("from test | enrich countries | stats s = sum(number) by country_name"); + Map result = runEsql("from test1 | enrich countries | stats s = sum(number) by country_name"); var columns = List.of(Map.of("name", "s", "type", "long"), Map.of("name", "country_name", "type", "keyword")); var values = List.of(List.of(2000, "United States of America"), List.of(5000, "China")); assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); } + public void testSimpleIndexFilteringWithEnrich() throws IOException { + // no filter + Map result = runEsql(""" + from test* metadata _index + | enrich countries + | keep *number, geo.dest, _index + | sort geo.dest, _index + """); + var columns = List.of( + Map.of("name", "country_number", "type", "long"), + Map.of("name", "number", "type", "long"), + Map.of("name", "geo.dest", "type", "keyword"), + Map.of("name", "_index", "type", "keyword") + ); + var values = List.of( + Arrays.asList(null, 5000, "CN", "test1"), + Arrays.asList(2, null, "IN", "test2"), + Arrays.asList(2, null, "IN", "test2"), + Arrays.asList(null, 1000, "US", "test1"), + Arrays.asList(null, 1000, "US", "test1"), + Arrays.asList(3, null, "US", "test2") + ); + assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + + // filter something that won't affect the columns + result = runEsql(""" + from test* metadata _index + | enrich countries + | keep *number, geo.dest, _index + | sort geo.dest, _index + """, b -> b.startObject("exists").field("field", "foobar").endObject()); + assertMap(result, matchesMap().entry("columns", columns).entry("values", List.of()).entry("took", greaterThanOrEqualTo(0))); + } + + public void testIndexFilteringWithEnrich_RemoveOneIndex() throws IOException { + // filter out test2 but specifically use one of its fields in the query (country_number) + Map result = runEsql(""" + from test* metadata _index + | enrich countries + | keep country_number, number, geo.dest, _index + | sort geo.dest, _index + """, b -> b.startObject("exists").field("field", "number").endObject()); + + var columns = List.of( + Map.of("name", "country_number", "type", "long"), + Map.of("name", "number", "type", "long"), + Map.of("name", "geo.dest", "type", "keyword"), + Map.of("name", "_index", "type", "keyword") + ); + var values = List.of( + Arrays.asList(null, 5000, "CN", "test1"), + Arrays.asList(null, 1000, "US", "test1"), + Arrays.asList(null, 1000, "US", "test1") + ); + + assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + + // filter out test2 and use a wildcarded field name in the "keep" command + result = runEsql(""" + from test* metadata _index + | enrich countries + | keep *number, geo.dest, _index + | sort geo.dest, _index + """, b -> b.startObject("exists").field("field", "number").endObject()); + + columns = List.of( + Map.of("name", "number", "type", "long"), + Map.of("name", "geo.dest", "type", "keyword"), + Map.of("name", "_index", "type", "keyword") + ); + values = List.of(Arrays.asList(5000, "CN", "test1"), Arrays.asList(1000, "US", "test1"), Arrays.asList(1000, "US", "test1")); + assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + } + + public void testIndexFilteringWithEnrich_ExpectException() throws IOException { + // no filter, just a simple query with "enrich" that should throw a valid VerificationException + ResponseException e = expectThrows(ResponseException.class, () -> runEsql(""" + from test* metadata _index + | enrich countries + | where foobar == 123 + """)); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 3:13: Unknown column [foobar]")); + + // same query, but with a filter this time + e = expectThrows(ResponseException.class, () -> runEsql(""" + from test* metadata _index + | enrich countries + | where foobar == 123 + """, b -> b.startObject("exists").field("field", "number").endObject())); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 3:13: Unknown column [foobar]")); + } + + public void testIndexFilteringWithEnrich_FilterUnusedIndexFields() throws IOException { + // filter out "test1". The field that is specific to "test1" ("number") is not actually used in the query + Map result = runEsql(""" + from test* metadata _index + | enrich countries + | keep country_number, geo.dest, _index + | sort geo.dest, _index + """, b -> b.startObject("exists").field("field", "country_number").endObject()); + + var columns = List.of( + Map.of("name", "country_number", "type", "long"), + Map.of("name", "geo.dest", "type", "keyword"), + Map.of("name", "_index", "type", "keyword") + ); + var values = List.of(Arrays.asList(2, "IN", "test2"), Arrays.asList(2, "IN", "test2"), Arrays.asList(3, "US", "test2")); + assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + } + private Map runEsql(String query) throws IOException { - return runEsql(query, mode); + return runEsql(query, null, mode); } - private Map runEsql(String query, Mode mode) throws IOException { - var requestObject = new RestEsqlTestCase.RequestObjectBuilder().query(query); + private Map runEsql(String query, CheckedConsumer filter) throws IOException { + return runEsql(query, filter, mode); + } + + private Map runEsql(String query, CheckedConsumer filter, Mode mode) throws IOException { + var requestObject = new RestEsqlTestCase.RequestObjectBuilder(); + if (filter != null) { + requestObject.filter(filter); + } + requestObject.query(query); if (mode == Mode.ASYNC) { return RestEsqlTestCase.runEsqlAsync(requestObject); } else { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index c8a7a6bcc4e98..c8e993b7dbf0b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -411,7 +411,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas } try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) { String indexName = EnrichPolicy.getBaseName(policyName); - indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, refs.acquire(indexResult -> { + indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, refs.acquire(indexResult -> { if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) { EsIndex esIndex = indexResult.get(); var concreteIndices = Map.of(request.clusterAlias, Iterables.get(esIndex.concreteIndices(), 0)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 3d1ed8f70eae0..71fba5683644d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.Strings; import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.collect.Iterators; @@ -25,6 +26,7 @@ import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; import org.elasticsearch.xpack.esql.analysis.Analyzer; @@ -151,6 +153,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P analyzedPlan( parse(request.query(), request.params()), executionInfo, + request.filter(), new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { @@ -268,31 +271,28 @@ private LogicalPlan parse(String query, QueryParams params) { return parsed; } - public void analyzedPlan(LogicalPlan parsed, EsqlExecutionInfo executionInfo, ActionListener listener) { + public void analyzedPlan( + LogicalPlan parsed, + EsqlExecutionInfo executionInfo, + QueryBuilder requestFilter, + ActionListener logicalPlanListener + ) { if (parsed.analyzed()) { - listener.onResponse(parsed); + logicalPlanListener.onResponse(parsed); return; } - preAnalyze(parsed, executionInfo, (indices, lookupIndices, policies) -> { + TriFunction analyzeAction = (indices, lookupIndices, policies) -> { planningMetrics.gatherPreAnalysisMetrics(parsed); Analyzer analyzer = new Analyzer( new AnalyzerContext(configuration, functionRegistry, indices, lookupIndices, policies), verifier ); - var plan = analyzer.analyze(parsed); + LogicalPlan plan = analyzer.analyze(parsed); plan.setAnalyzed(); - LOGGER.debug("Analyzed plan:\n{}", plan); return plan; - }, listener); - } + }; - private void preAnalyze( - LogicalPlan parsed, - EsqlExecutionInfo executionInfo, - TriFunction action, - ActionListener listener - ) { PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed); var unresolvedPolicies = preAnalysis.enriches.stream() .map(e -> new EnrichPolicyResolver.UnresolvedPolicy((String) e.policyName().fold(), e.mode())) @@ -302,81 +302,113 @@ private void preAnalyze( final Set targetClusters = enrichPolicyResolver.groupIndicesPerCluster( indices.stream().flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().index()))).toArray(String[]::new) ).keySet(); - enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, listener.delegateFailureAndWrap((l, enrichResolution) -> { - // first we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API - var enrichMatchFields = enrichResolution.resolvedEnrichPolicies() - .stream() - .map(ResolvedEnrichPolicy::matchField) - .collect(Collectors.toSet()); - // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy - var fieldNames = fieldNames(parsed, enrichMatchFields); - // First resolve the lookup indices, then the main indices - preAnalyzeLookupIndices( - preAnalysis.lookupIndices, + + SubscribableListener.newForked(l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l)) + .andThen((l, enrichResolution) -> { + // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API + var enrichMatchFields = enrichResolution.resolvedEnrichPolicies() + .stream() + .map(ResolvedEnrichPolicy::matchField) + .collect(Collectors.toSet()); + // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy + var fieldNames = fieldNames(parsed, enrichMatchFields); + ListenerResult listenerResult = new ListenerResult(null, null, enrichResolution, fieldNames); + + // first resolve the lookup indices, then the main indices + preAnalyzeLookupIndices(preAnalysis.lookupIndices, listenerResult, l); + }) + .andThen((l, listenerResult) -> { + // resolve the main indices + preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, requestFilter, l); + }) + .andThen((l, listenerResult) -> { + // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for + // invalid index resolution to updateExecutionInfo + if (listenerResult.indices.isValid()) { + // CCS indices and skip_unavailable cluster values can stop the analysis right here + if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, listenerResult, logicalPlanListener, l)) + return; + } + // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step + l.onResponse(listenerResult); + }) + .andThen((l, listenerResult) -> { + // first attempt (maybe the only one) at analyzing the plan + analyzeAndMaybeRetry(analyzeAction, requestFilter, listenerResult, logicalPlanListener, l); + }) + .andThen((l, listenerResult) -> { + assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; + + // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices + // resolving one more time (the first attempt failed and the query had a filter) + for (String clusterAlias : executionInfo.clusterAliases()) { + executionInfo.swapCluster(clusterAlias, (k, v) -> null); + } + + // here the requestFilter is set to null, performing the pre-analysis after the first step failed + preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, null, l); + }) + .andThen((l, listenerResult) -> { + assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request"; + LOGGER.debug("Analyzing the plan (second attempt, without filter)"); + LogicalPlan plan; + try { + plan = analyzeAction.apply(listenerResult.indices, listenerResult.lookupIndices, listenerResult.enrichResolution); + } catch (Exception e) { + l.onFailure(e); + return; + } + LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan); + l.onResponse(plan); + }) + .addListener(logicalPlanListener); + } + + private void preAnalyzeLookupIndices(List indices, ListenerResult listenerResult, ActionListener listener) { + if (indices.size() > 1) { + // Note: JOINs on more than one index are not yet supported + listener.onFailure(new MappingException("More than one LOOKUP JOIN is not supported")); + } else if (indices.size() == 1) { + TableInfo tableInfo = indices.get(0); + TableIdentifier table = tableInfo.id(); + // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types + indexResolver.resolveAsMergedMapping( + table.index(), Set.of("*"), // Current LOOKUP JOIN syntax does not allow for field selection - l.delegateFailureAndWrap( - (lx, lookupIndexResolution) -> preAnalyzeIndices( - indices, - executionInfo, - enrichResolution.getUnavailableClusters(), - fieldNames, - lx.delegateFailureAndWrap((ll, indexResolution) -> { - // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid - // index resolution to updateExecutionInfo - if (indexResolution.isValid()) { - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters( - executionInfo, - indexResolution.unavailableClusters() - ); - if (executionInfo.isCrossClusterSearch() - && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { - // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel - // Exception to let the LogicalPlanActionListener decide how to proceed - ll.onFailure(new NoClustersToSearchException()); - return; - } - - Set newClusters = enrichPolicyResolver.groupIndicesPerCluster( - indexResolution.get().concreteIndices().toArray(String[]::new) - ).keySet(); - // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again - // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies - // again. - // TODO: add a test for this - if (targetClusters.containsAll(newClusters) == false - // do not bother with a re-resolution if only remotes were requested and all were offline - && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) { - enrichPolicyResolver.resolvePolicies( - newClusters, - unresolvedPolicies, - ll.map( - newEnrichResolution -> action.apply(indexResolution, lookupIndexResolution, newEnrichResolution) - ) - ); - return; - } - } - ll.onResponse(action.apply(indexResolution, lookupIndexResolution, enrichResolution)); - }) - ) - ) + null, + listener.map(indexResolution -> listenerResult.withLookupIndexResolution(indexResolution)) ); - })); + } else { + try { + // No lookup indices specified + listener.onResponse( + new ListenerResult( + listenerResult.indices, + IndexResolution.invalid("[none specified]"), + listenerResult.enrichResolution, + listenerResult.fieldNames + ) + ); + } catch (Exception ex) { + listener.onFailure(ex); + } + } } private void preAnalyzeIndices( List indices, EsqlExecutionInfo executionInfo, - Map unavailableClusters, // known to be unavailable from the enrich policy API call - Set fieldNames, - ActionListener listener + ListenerResult listenerResult, + QueryBuilder requestFilter, + ActionListener listener ) { // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one if (indices.size() > 1) { // Note: JOINs are not supported but we detect them when listener.onFailure(new MappingException("Queries with multiple indices are not supported")); } else if (indices.size() == 1) { + // known to be unavailable from the enrich policy API call + Map unavailableClusters = listenerResult.enrichResolution.getUnavailableClusters(); TableInfo tableInfo = indices.get(0); TableIdentifier table = tableInfo.id(); @@ -409,38 +441,116 @@ private void preAnalyzeIndices( String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution - listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of()))); + listener.onResponse( + new ListenerResult( + IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())), + listenerResult.lookupIndices, + listenerResult.enrichResolution, + listenerResult.fieldNames + ) + ); } else { // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types - indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, listener); + indexResolver.resolveAsMergedMapping( + indexExpressionToResolve, + listenerResult.fieldNames, + requestFilter, + listener.map(indexResolution -> listenerResult.withIndexResolution(indexResolution)) + ); } } else { try { // occurs when dealing with local relations (row a = 1) - listener.onResponse(IndexResolution.invalid("[none specified]")); + listener.onResponse( + new ListenerResult( + IndexResolution.invalid("[none specified]"), + listenerResult.lookupIndices, + listenerResult.enrichResolution, + listenerResult.fieldNames + ) + ); } catch (Exception ex) { listener.onFailure(ex); } } } - private void preAnalyzeLookupIndices(List indices, Set fieldNames, ActionListener listener) { - if (indices.size() > 1) { - // Note: JOINs on more than one index are not yet supported - listener.onFailure(new MappingException("More than one LOOKUP JOIN is not supported")); - } else if (indices.size() == 1) { - TableInfo tableInfo = indices.get(0); - TableIdentifier table = tableInfo.id(); - // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types - indexResolver.resolveAsMergedMapping(table.index(), fieldNames, listener); - } else { - try { - // No lookup indices specified - listener.onResponse(IndexResolution.invalid("[none specified]")); - } catch (Exception ex) { - listener.onFailure(ex); + private boolean analyzeCCSIndices( + EsqlExecutionInfo executionInfo, + Set targetClusters, + Set unresolvedPolicies, + ListenerResult listenerResult, + ActionListener logicalPlanListener, + ActionListener l + ) { + IndexResolution indexResolution = listenerResult.indices; + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); + if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { + // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception + // to let the LogicalPlanActionListener decide how to proceed + logicalPlanListener.onFailure(new NoClustersToSearchException()); + return true; + } + + Set newClusters = enrichPolicyResolver.groupIndicesPerCluster( + indexResolution.get().concreteIndices().toArray(String[]::new) + ).keySet(); + // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again + // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again. + // TODO: add a test for this + if (targetClusters.containsAll(newClusters) == false + // do not bother with a re-resolution if only remotes were requested and all were offline + && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) { + enrichPolicyResolver.resolvePolicies( + newClusters, + unresolvedPolicies, + l.map(enrichResolution -> listenerResult.withEnrichResolution(enrichResolution)) + ); + return true; + } + return false; + } + + private static void analyzeAndMaybeRetry( + TriFunction analyzeAction, + QueryBuilder requestFilter, + ListenerResult listenerResult, + ActionListener logicalPlanListener, + ActionListener l + ) { + LogicalPlan plan = null; + var filterPresentMessage = requestFilter == null ? "without" : "with"; + var attemptMessage = requestFilter == null ? "the only" : "first"; + LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage); + + try { + plan = analyzeAction.apply(listenerResult.indices, listenerResult.lookupIndices, listenerResult.enrichResolution); + } catch (Exception e) { + if (e instanceof VerificationException ve) { + LOGGER.debug( + "Analyzing the plan ({} attempt, {} filter) failed with {}", + attemptMessage, + filterPresentMessage, + ve.getDetailedMessage() + ); + if (requestFilter == null) { + // if the initial request didn't have a filter, then just pass the exception back to the user + logicalPlanListener.onFailure(ve); + } else { + // interested only in a VerificationException, but this time we are taking out the index filter + // to try and make the index resolution work without any index filtering. In the next step... to be continued + l.onResponse(listenerResult); + } + } else { + // if the query failed with any other type of exception, then just pass the exception back to the user + logicalPlanListener.onFailure(e); } + return; } + LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan); + // the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning + logicalPlanListener.onResponse(plan); } static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchFields) { @@ -591,4 +701,23 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { LOGGER.debug("Optimized physical plan:\n{}", plan); return plan; } + + private record ListenerResult( + IndexResolution indices, + IndexResolution lookupIndices, + EnrichResolution enrichResolution, + Set fieldNames + ) { + ListenerResult withEnrichResolution(EnrichResolution newEnrichResolution) { + return new ListenerResult(indices(), lookupIndices(), newEnrichResolution, fieldNames()); + } + + ListenerResult withIndexResolution(IndexResolution newIndexResolution) { + return new ListenerResult(newIndexResolution, lookupIndices(), enrichResolution(), fieldNames()); + } + + ListenerResult withLookupIndexResolution(IndexResolution newIndexResolution) { + return new ListenerResult(indices(), newIndexResolution, enrichResolution(), fieldNames()); + } + }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index f61be4b59830e..d000b2765e2b1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.TimeSeriesParams; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction; import org.elasticsearch.xpack.esql.core.type.DataType; @@ -76,10 +77,15 @@ public IndexResolver(Client client) { /** * Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping. */ - public void resolveAsMergedMapping(String indexWildcard, Set fieldNames, ActionListener listener) { + public void resolveAsMergedMapping( + String indexWildcard, + Set fieldNames, + QueryBuilder requestFilter, + ActionListener listener + ) { client.execute( EsqlResolveFieldsAction.TYPE, - createFieldCapsRequest(indexWildcard, fieldNames), + createFieldCapsRequest(indexWildcard, fieldNames, requestFilter), listener.delegateFailureAndWrap((l, response) -> l.onResponse(mergedMappings(indexWildcard, response))) ); } @@ -252,10 +258,11 @@ private EsField conflictingMetricTypes(String name, String fullName, FieldCapabi return new InvalidMappedField(name, "mapped as different metric types in indices: " + indices); } - private static FieldCapabilitiesRequest createFieldCapsRequest(String index, Set fieldNames) { + private static FieldCapabilitiesRequest createFieldCapsRequest(String index, Set fieldNames, QueryBuilder requestFilter) { FieldCapabilitiesRequest req = new FieldCapabilitiesRequest().indices(Strings.commaDelimitedListToStringArray(index)); req.fields(fieldNames.toArray(String[]::new)); req.includeUnmapped(true); + req.indexFilter(requestFilter); // lenient because we throw our own errors looking at the response e.g. if something was not resolved // also because this way security doesn't throw authorization exceptions but rather honors ignore_unavailable req.indicesOptions(FIELD_CAPS_INDICES_OPTIONS);