From ae8962f5a246da0431fa7525e921425d95a8be41 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Mon, 28 Oct 2024 13:31:15 +0200 Subject: [PATCH 01/17] WIP --- .../esql/enrich/EnrichPolicyResolver.java | 2 +- .../xpack/esql/session/EsqlSession.java | 31 ++++++++++++++----- .../xpack/esql/session/IndexResolver.java | 9 ++++-- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index e67c406e26929..f780a6df47887 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -364,7 +364,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas } try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) { String indexName = EnrichPolicy.getBaseName(policyName); - indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, refs.acquire(indexResult -> { + indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, refs.acquire(indexResult -> { if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) { EsIndex esIndex = indexResult.get(); var concreteIndices = Map.of(request.clusterAlias, Iterables.get(esIndex.concreteIndices(), 0)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index ccd167942340c..1e19e5ca53a0b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -23,6 +23,7 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; import org.elasticsearch.xpack.esql.analysis.Analyzer; @@ -145,7 +146,8 @@ public void execute( executionInfo, listener.delegateFailureAndWrap( (next, analyzedPlan) -> executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(analyzedPlan), next) - ) + ), + request.filter() ); } @@ -205,7 +207,8 @@ private LogicalPlan parse(String query, QueryParams params) { return parsed; } - public void analyzedPlan(LogicalPlan parsed, EsqlExecutionInfo executionInfo, ActionListener listener) { + public void analyzedPlan(LogicalPlan parsed, EsqlExecutionInfo executionInfo, ActionListener listener, + QueryBuilder requestFilter) { if (parsed.analyzed()) { listener.onResponse(parsed); return; @@ -214,18 +217,29 @@ public void analyzedPlan(LogicalPlan parsed, EsqlExecutionInfo executionInfo, Ac preAnalyze(parsed, executionInfo, (indices, policies) -> { planningMetrics.gatherPreAnalysisMetrics(parsed); Analyzer analyzer = new Analyzer(new AnalyzerContext(configuration, functionRegistry, indices, policies), verifier); - var plan = analyzer.analyze(parsed); + + LogicalPlan plan; + /*if (requestFilter != null) { + try {*/ + plan = analyzer.analyze(parsed); + /* } catch (VerificationException ve) { + + } + } else { + plan = analyzer.analyze(parsed); + }*/ plan.setAnalyzed(); LOGGER.debug("Analyzed plan:\n{}", plan); return plan; - }, listener); + }, listener, requestFilter); } private void preAnalyze( LogicalPlan parsed, EsqlExecutionInfo executionInfo, BiFunction action, - ActionListener listener + ActionListener listener, + QueryBuilder requestFilter ) { PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed); var unresolvedPolicies = preAnalysis.enriches.stream() @@ -262,7 +276,7 @@ private void preAnalyze( } } ll.onResponse(action.apply(indexResolution, enrichResolution)); - }), matchFields); + }), matchFields, requestFilter); })); } @@ -270,7 +284,8 @@ private void preAnalyzeIndices( LogicalPlan parsed, EsqlExecutionInfo executionInfo, ActionListener listener, - Set enrichPolicyMatchFields + Set enrichPolicyMatchFields, + QueryBuilder requestFilter ) { PreAnalyzer.PreAnalysis preAnalysis = new PreAnalyzer().preAnalyze(parsed); // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one @@ -291,7 +306,7 @@ private void preAnalyzeIndices( return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); }); } - indexResolver.resolveAsMergedMapping(table.index(), fieldNames, listener); + indexResolver.resolveAsMergedMapping(table.index(), fieldNames, requestFilter, listener); } else { try { // occurs when dealing with local relations (row a = 1) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index c0f94bccc50a4..d82970a35e1cf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.TimeSeriesParams; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction; @@ -78,10 +79,11 @@ public IndexResolver(Client client) { /** * Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping. */ - public void resolveAsMergedMapping(String indexWildcard, Set fieldNames, ActionListener listener) { + public void resolveAsMergedMapping(String indexWildcard, Set fieldNames, QueryBuilder requestFilter, + ActionListener listener) { client.execute( EsqlResolveFieldsAction.TYPE, - createFieldCapsRequest(indexWildcard, fieldNames), + createFieldCapsRequest(indexWildcard, fieldNames, requestFilter), listener.delegateFailureAndWrap((l, response) -> l.onResponse(mergedMappings(indexWildcard, response))) ); } @@ -284,10 +286,11 @@ private EsField conflictingMetricTypes(String name, String fullName, FieldCapabi return new InvalidMappedField(name, "mapped as different metric types in indices: " + indices); } - private static FieldCapabilitiesRequest createFieldCapsRequest(String index, Set fieldNames) { + private static FieldCapabilitiesRequest createFieldCapsRequest(String index, Set fieldNames, QueryBuilder requestFilter) { FieldCapabilitiesRequest req = new FieldCapabilitiesRequest().indices(Strings.commaDelimitedListToStringArray(index)); req.fields(fieldNames.toArray(String[]::new)); req.includeUnmapped(true); + req.indexFilter(requestFilter); // lenient because we throw our own errors looking at the response e.g. if something was not resolved // also because this way security doesn't throw authorization exceptions but rather honors ignore_unavailable req.indicesOptions(FIELD_CAPS_INDICES_OPTIONS); From b91df0b3ca0e6e7144d79668b3472f763ac0a27a Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Mon, 4 Nov 2024 12:09:45 +0200 Subject: [PATCH 02/17] wip --- .../xpack/esql/session/EsqlSession.java | 198 ++++++++++++++---- .../xpack/esql/session/IndexResolver.java | 8 +- 2 files changed, 159 insertions(+), 47 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 827a3e3677419..1c5b414683e5c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; @@ -19,6 +20,7 @@ import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.indices.IndicesExpressionGrouper; @@ -178,7 +180,7 @@ public void executeOptimizedPlan( executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener); } - private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {}; + private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {} private void executeSubPlans( PhysicalPlan physicalPlan, @@ -268,40 +270,26 @@ private LogicalPlan parse(String query, QueryParams params) { return parsed; } - public void analyzedPlan(LogicalPlan parsed, EsqlExecutionInfo executionInfo, ActionListener listener, - QueryBuilder requestFilter) { + public void analyzedPlan( + LogicalPlan parsed, + EsqlExecutionInfo executionInfo, + ActionListener finalListener, + QueryBuilder requestFilter + ) { if (parsed.analyzed()) { - listener.onResponse(parsed); + finalListener.onResponse(parsed); return; } - preAnalyze(parsed, executionInfo, (indices, policies) -> { + BiFunction analyzeAction = (indices, policies) -> { planningMetrics.gatherPreAnalysisMetrics(parsed); Analyzer analyzer = new Analyzer(new AnalyzerContext(configuration, functionRegistry, indices, policies), verifier); - LogicalPlan plan; - /*if (requestFilter != null) { - try {*/ - plan = analyzer.analyze(parsed); - /* } catch (VerificationException ve) { - - } - } else { - plan = analyzer.analyze(parsed); - }*/ + LogicalPlan plan = analyzer.analyze(parsed); plan.setAnalyzed(); - LOGGER.debug("Analyzed plan:\n{}", plan); return plan; - }, listener, requestFilter); - } + }; - private void preAnalyze( - LogicalPlan parsed, - EsqlExecutionInfo executionInfo, - BiFunction action, - ActionListener listener, - QueryBuilder requestFilter - ) { PreAnalyzer.PreAnalysis preAnalysis = preAnalyzer.preAnalyze(parsed); var unresolvedPolicies = preAnalysis.enriches.stream() .map(e -> new EnrichPolicyResolver.UnresolvedPolicy((String) e.policyName().fold(), e.mode())) @@ -311,14 +299,19 @@ private void preAnalyze( .flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().index()))) .toArray(String[]::new) ).keySet(); - enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, listener.delegateFailureAndWrap((l, enrichResolution) -> { - // first we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API - var matchFields = enrichResolution.resolvedEnrichPolicies() - .stream() - .map(ResolvedEnrichPolicy::matchField) - .collect(Collectors.toSet()); - Map unavailableClusters = enrichResolution.getUnavailableClusters(); - preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l.delegateFailureAndWrap((ll, indexResolution) -> { + + SubscribableListener.newForked(l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l)) + .>andThen((l, enrichResolution) -> { + // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API + var matchFields = enrichResolution.resolvedEnrichPolicies() + .stream() + .map(ResolvedEnrichPolicy::matchField) + .collect(Collectors.toSet()); + Map unavailableClusters = enrichResolution.getUnavailableClusters(); + preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l, matchFields, requestFilter, enrichResolution); + }) + .>andThen((l, tuple) -> { + var indexResolution = tuple.v2(); // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index // resolution to updateExecutionInfo if (indexResolution.isValid()) { @@ -328,8 +321,8 @@ private void preAnalyze( && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel // Exception to let the LogicalPlanActionListener decide how to proceed - ll.onFailure(new NoClustersToSearchException()); - return; + finalListener.onFailure(new NoClustersToSearchException()); + // return; } Set newClusters = enrichPolicyResolver.groupIndicesPerCluster( @@ -344,23 +337,85 @@ private void preAnalyze( enrichPolicyResolver.resolvePolicies( newClusters, unresolvedPolicies, - ll.map(newEnrichResolution -> action.apply(indexResolution, newEnrichResolution)) + new IndexResolutionWrappingListener(l, indexResolution) ); - return; + // return; + } else { + // the same enrich resolution received is returned in the listener + l.onResponse(tuple); } } - ll.onResponse(action.apply(indexResolution, enrichResolution)); - }), matchFields, requestFilter); - })); + if (requestFilter == null) { + // we are not interested in any kind of failures if the request doesn't have a filter, because: + // 1. either this is the second attempt to make Analysis work by excluding any potential filter + // 2. the ES|QL query didn't have a filter in the first place and there was no second try + var plan = analyzeAction.apply(tuple.v2(), tuple.v1()); + LOGGER.debug("Analyzed plan (no request filter):\n{}", plan); + finalListener.onResponse(plan); + } else { + // don't stop and pass the tuple to the next listener that runs "action" one more time + l.onResponse(tuple); + } + }) + .>andThen((l, tuple) -> { + var indexResolution = tuple.v2(); + var enrichResolution = tuple.v1(); + LogicalPlan plan = null; + LOGGER.debug("Analyzing the plan (first attempt, with filter)"); + try { + plan = analyzeAction.apply(indexResolution, enrichResolution); + } catch (VerificationException ve) { + LOGGER.debug("Analyzing the plan (first attempt, with filter) failed with {}", ve.); + // interested only in a VerificationException, but this time we are taking out the index filter + // to try and make the index resolution work without any index filtering. In the next step... to be continued + l.onResponse(tuple); + } + LOGGER.debug("Analyzed plan (first attempt, with filter):\n{}", plan); + finalListener.onResponse(plan); + }) + .>andThen((l, tuple) -> { + if (requestFilter != null) { + var enrichResolution = tuple.v1(); + // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API + var matchFields = enrichResolution.resolvedEnrichPolicies() + .stream() + .map(ResolvedEnrichPolicy::matchField) + .collect(Collectors.toSet()); + Map unavailableClusters = enrichResolution.getUnavailableClusters(); + + // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices + // resolving one more time + for (String clusterAlias : executionInfo.clusterAliases()) { + executionInfo.swapCluster(clusterAlias, (k, v) -> null); + } + + // here the requestFilter is set to null, performing the pre-analysis after the first step failed + preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l, matchFields, null, enrichResolution); + } else { + l.onResponse(tuple); + } + }) + .>andThen((l, tuple) -> { + if (requestFilter != null) { + LOGGER.debug("Analyzing the plan (second attempt, without filter)"); + var plan = analyzeAction.apply(tuple.v2(), tuple.v1()); + LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan); + finalListener.onResponse(plan); + } else { + l.onResponse(tuple); + } + }) + .andThenAccept(tuple -> LOGGER.debug("shouldn't have reached this point")); } private void preAnalyzeIndices( LogicalPlan parsed, EsqlExecutionInfo executionInfo, Map unavailableClusters, // known to be unavailable from the enrich policy API call - ActionListener listener, + ActionListener> listener, Set enrichPolicyMatchFields, - QueryBuilder requestFilter + QueryBuilder requestFilter, + EnrichResolution enrichResolution ) { PreAnalyzer.PreAnalysis preAnalysis = new PreAnalyzer().preAnalyze(parsed); // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one @@ -401,15 +456,20 @@ private void preAnalyzeIndices( String indexExpressionToResolve = CcsUtils.createIndexExpressionFromAvailableClusters(executionInfo); if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution - listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of()))); + listener.onResponse(new Tuple<>(enrichResolution, IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())))); } else { // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types - indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, requestFilter, listener); + indexResolver.resolveAsMergedMapping( + indexExpressionToResolve, + fieldNames, + requestFilter, + new EnrichResolutionWrappingListener(listener, enrichResolution) + ); } } else { try { // occurs when dealing with local relations (row a = 1) - listener.onResponse(IndexResolution.invalid("[none specified]")); + listener.onResponse(new Tuple<>(enrichResolution, IndexResolution.invalid("[none specified]"))); } catch (Exception ex) { listener.onFailure(ex); } @@ -559,4 +619,52 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { LOGGER.debug("Optimized physical plan:\n{}", plan); return plan; } + + class IndexResolutionWrappingListener implements ActionListener { + + private final ActionListener> delegate; + private final IndexResolution indexResolution; + + IndexResolutionWrappingListener( + ActionListener> delegate, + IndexResolution indexResolution + ) { + this.delegate = delegate; + this.indexResolution = indexResolution; + } + + @Override + public void onResponse(EnrichResolution enrichResolution) { + delegate.onResponse(new Tuple<>(enrichResolution, indexResolution)); + } + + @Override + public void onFailure(Exception e) { + delegate.onFailure(e); + } + } + + class EnrichResolutionWrappingListener implements ActionListener { + + private final ActionListener> delegate; + private final EnrichResolution enrichResolution; + + EnrichResolutionWrappingListener( + ActionListener> delegate, + EnrichResolution enrichResolution + ) { + this.delegate = delegate; + this.enrichResolution = enrichResolution; + } + + @Override + public void onResponse(IndexResolution indexResolution) { + delegate.onResponse(new Tuple<>(enrichResolution, indexResolution)); + } + + @Override + public void onFailure(Exception e) { + delegate.onFailure(e); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index 257cd5ffe4ba2..bb52097824652 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -79,8 +79,12 @@ public IndexResolver(Client client) { /** * Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping. */ - public void resolveAsMergedMapping(String indexWildcard, Set fieldNames, QueryBuilder requestFilter, - ActionListener listener) { + public void resolveAsMergedMapping( + String indexWildcard, + Set fieldNames, + QueryBuilder requestFilter, + ActionListener listener + ) { client.execute( EsqlResolveFieldsAction.TYPE, createFieldCapsRequest(indexWildcard, fieldNames, requestFilter), From 7c68f241eef933e76bd47232243aaeccf2d4e2cb Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Wed, 13 Nov 2024 19:02:54 +0200 Subject: [PATCH 03/17] whatevs --- .../java/org/elasticsearch/xpack/esql/session/EsqlSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 158627705b7c6..c3ebad1bb57a6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -365,7 +365,7 @@ public void analyzedPlan( try { plan = analyzeAction.apply(indexResolution, enrichResolution); } catch (VerificationException ve) { - LOGGER.debug("Analyzing the plan (first attempt, with filter) failed with {}", ve.); + LOGGER.debug("Analyzing the plan (first attempt, with filter) failed with {}", ve.getDetailedMessage()); // interested only in a VerificationException, but this time we are taking out the index filter // to try and make the index resolution work without any index filtering. In the next step... to be continued l.onResponse(tuple); From 249bba57094bf2ead1b0284f88be091febf606c1 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Thu, 14 Nov 2024 18:13:27 +0200 Subject: [PATCH 04/17] Small fix, big impact --- .../java/org/elasticsearch/xpack/esql/session/EsqlSession.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index c3ebad1bb57a6..3d91cddfc5fe6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -340,9 +340,6 @@ public void analyzedPlan( new IndexResolutionWrappingListener(l, indexResolution) ); // return; - } else { - // the same enrich resolution received is returned in the listener - l.onResponse(tuple); } } if (requestFilter == null) { From f0b5b89cb824830567fc31c6be2fcf96243c2f7d Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Fri, 15 Nov 2024 18:08:10 +0200 Subject: [PATCH 05/17] Fix some things and rewrite the listeners a bit --- .../xpack/esql/session/EsqlSession.java | 107 ++++++++++-------- 1 file changed, 59 insertions(+), 48 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 825b88b61d044..3c0c34f25b9a8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -322,7 +322,6 @@ public void analyzedPlan( // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel // Exception to let the LogicalPlanActionListener decide how to proceed finalListener.onFailure(new NoClustersToSearchException()); - // return; } Set newClusters = enrichPolicyResolver.groupIndicesPerCluster( @@ -339,70 +338,82 @@ public void analyzedPlan( unresolvedPolicies, new IndexResolutionWrappingListener(l, indexResolution) ); - // return; } } - if (requestFilter == null) { - // we are not interested in any kind of failures if the request doesn't have a filter, because: - // 1. either this is the second attempt to make Analysis work by excluding any potential filter - // 2. the ES|QL query didn't have a filter in the first place and there was no second try - var plan = analyzeAction.apply(tuple.v2(), tuple.v1()); - LOGGER.debug("Analyzed plan (no request filter):\n{}", plan); - finalListener.onResponse(plan); - } else { - // don't stop and pass the tuple to the next listener that runs "action" one more time - l.onResponse(tuple); - } + // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step + l.onResponse(tuple); }) .>andThen((l, tuple) -> { var indexResolution = tuple.v2(); var enrichResolution = tuple.v1(); LogicalPlan plan = null; - LOGGER.debug("Analyzing the plan (first attempt, with filter)"); + + var filterPresentMessage = requestFilter == null ? "without" : "with"; + var attemptMessage = requestFilter == null ? "the only" : "first"; + LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage); try { plan = analyzeAction.apply(indexResolution, enrichResolution); - } catch (VerificationException ve) { - LOGGER.debug("Analyzing the plan (first attempt, with filter) failed with {}", ve.getDetailedMessage()); - // interested only in a VerificationException, but this time we are taking out the index filter - // to try and make the index resolution work without any index filtering. In the next step... to be continued - l.onResponse(tuple); + } catch (Exception e) { + if (e instanceof VerificationException ve) { + LOGGER.debug( + "Analyzing the plan ({} attempt, {} filter) failed with {}", + attemptMessage, + filterPresentMessage, + ve.getDetailedMessage() + ); + if (requestFilter == null) { + // if the initial request didn't have a filter, then just pass the exception back to the user + finalListener.onFailure(ve); + return; + } else { + // interested only in a VerificationException, but this time we are taking out the index filter + // to try and make the index resolution work without any index filtering. In the next step... to be continued + l.onResponse(tuple); + } + } else { + // if the query failed with any other type of exception, then just pass the exception back to the user + finalListener.onFailure(e); + return; + } } - LOGGER.debug("Analyzed plan (first attempt, with filter):\n{}", plan); + LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan); + // the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning finalListener.onResponse(plan); }) .>andThen((l, tuple) -> { - if (requestFilter != null) { - var enrichResolution = tuple.v1(); - // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API - var matchFields = enrichResolution.resolvedEnrichPolicies() - .stream() - .map(ResolvedEnrichPolicy::matchField) - .collect(Collectors.toSet()); - Map unavailableClusters = enrichResolution.getUnavailableClusters(); - - // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices - // resolving one more time - for (String clusterAlias : executionInfo.clusterAliases()) { - executionInfo.swapCluster(clusterAlias, (k, v) -> null); - } + assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; + // extracting the matchFields again (same operation) + // TODO: need to find a way of reusing the matchFields from the first async step + var enrichResolution = tuple.v1(); + // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API + var matchFields = enrichResolution.resolvedEnrichPolicies() + .stream() + .map(ResolvedEnrichPolicy::matchField) + .collect(Collectors.toSet()); + Map unavailableClusters = enrichResolution.getUnavailableClusters(); - // here the requestFilter is set to null, performing the pre-analysis after the first step failed - preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l, matchFields, null, enrichResolution); - } else { - l.onResponse(tuple); + // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices + // resolving one more time (the first attempt failed and the query had a filter) + for (String clusterAlias : executionInfo.clusterAliases()) { + executionInfo.swapCluster(clusterAlias, (k, v) -> null); } + + // here the requestFilter is set to null, performing the pre-analysis after the first step failed + preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l, matchFields, null, enrichResolution); }) - .>andThen((l, tuple) -> { - if (requestFilter != null) { - LOGGER.debug("Analyzing the plan (second attempt, without filter)"); - var plan = analyzeAction.apply(tuple.v2(), tuple.v1()); - LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan); - finalListener.onResponse(plan); - } else { - l.onResponse(tuple); + .andThenAccept(tuple -> { + assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request"; + LOGGER.debug("Analyzing the plan (second attempt, without filter)"); + LogicalPlan plan; + try { + plan = analyzeAction.apply(tuple.v2(), tuple.v1()); + } catch (Exception e) { + finalListener.onFailure(e); + return; } - }) - .andThenAccept(tuple -> LOGGER.debug("shouldn't have reached this point")); + LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan); + finalListener.onResponse(plan); + }); } private void preAnalyzeIndices( From 8c3a05ac1b63374371843d4a77fa57d152fced86 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Mon, 18 Nov 2024 16:05:41 +0200 Subject: [PATCH 06/17] Link the original Listener to the Subscribable one, as well --- .../xpack/esql/session/EsqlSession.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 3c0c34f25b9a8..68d865c2a1947 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -401,19 +401,23 @@ public void analyzedPlan( // here the requestFilter is set to null, performing the pre-analysis after the first step failed preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l, matchFields, null, enrichResolution); }) - .andThenAccept(tuple -> { + // .andThenAccept(tuple -> { + .andThen((l, tuple) -> { assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request"; LOGGER.debug("Analyzing the plan (second attempt, without filter)"); LogicalPlan plan; try { plan = analyzeAction.apply(tuple.v2(), tuple.v1()); } catch (Exception e) { - finalListener.onFailure(e); + // finalListener.onFailure(e); + l.onFailure(e); return; } LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan); - finalListener.onResponse(plan); - }); + // finalListener.onResponse(plan); + l.onResponse(plan); + }) + .addListener(finalListener); } private void preAnalyzeIndices( From c3964246f07f0f47d795984e6f73fd90c86e4f5b Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Mon, 18 Nov 2024 17:43:40 +0200 Subject: [PATCH 07/17] Small fix --- .../java/org/elasticsearch/xpack/esql/session/EsqlSession.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 68d865c2a1947..06fb237855afe 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -322,6 +322,7 @@ public void analyzedPlan( // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel // Exception to let the LogicalPlanActionListener decide how to proceed finalListener.onFailure(new NoClustersToSearchException()); + return; } Set newClusters = enrichPolicyResolver.groupIndicesPerCluster( @@ -338,6 +339,7 @@ public void analyzedPlan( unresolvedPolicies, new IndexResolutionWrappingListener(l, indexResolution) ); + return; } } // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step From 9ef19a0ff3f619369b6d4bf57cc855af7dfc00bb Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 19 Nov 2024 16:54:59 +0200 Subject: [PATCH 08/17] wip --- .../java/org/elasticsearch/xpack/esql/session/EsqlSession.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 06fb237855afe..a4d446fb2a370 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -403,7 +403,6 @@ public void analyzedPlan( // here the requestFilter is set to null, performing the pre-analysis after the first step failed preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l, matchFields, null, enrichResolution); }) - // .andThenAccept(tuple -> { .andThen((l, tuple) -> { assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request"; LOGGER.debug("Analyzing the plan (second attempt, without filter)"); @@ -411,12 +410,10 @@ public void analyzedPlan( try { plan = analyzeAction.apply(tuple.v2(), tuple.v1()); } catch (Exception e) { - // finalListener.onFailure(e); l.onFailure(e); return; } LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan); - // finalListener.onResponse(plan); l.onResponse(plan); }) .addListener(finalListener); From baa68917aedfa6e066651a7c79098ba45eab88a2 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Mon, 25 Nov 2024 21:39:57 +0200 Subject: [PATCH 09/17] Fix after update from main --- .../xpack/esql/session/EsqlSession.java | 320 ++++++++++-------- 1 file changed, 188 insertions(+), 132 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 4112470856aa5..5d762a3e48ed6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.compute.data.Block; @@ -20,7 +21,6 @@ import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.indices.IndicesExpressionGrouper; @@ -79,7 +79,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -273,18 +273,20 @@ private LogicalPlan parse(String query, QueryParams params) { public void analyzedPlan( LogicalPlan parsed, EsqlExecutionInfo executionInfo, - ActionListener finalListener, + ActionListener logicalPlanListener, QueryBuilder requestFilter ) { if (parsed.analyzed()) { - finalListener.onResponse(parsed); + logicalPlanListener.onResponse(parsed); return; } - BiFunction analyzeAction = (indices, policies) -> { + TriFunction analyzeAction = (indices, lookupIndices, policies) -> { planningMetrics.gatherPreAnalysisMetrics(parsed); - Analyzer analyzer = new Analyzer(new AnalyzerContext(configuration, functionRegistry, indices, policies), verifier); - + Analyzer analyzer = new Analyzer( + new AnalyzerContext(configuration, functionRegistry, indices, lookupIndices, policies), + verifier + ); LogicalPlan plan = analyzer.analyze(parsed); plan.setAnalyzed(); return plan; @@ -301,98 +303,40 @@ public void analyzedPlan( ).keySet(); SubscribableListener.newForked(l -> enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, l)) - .>andThen((l, enrichResolution) -> { + .andThen((l, enrichResolution) -> { // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API - var matchFields = enrichResolution.resolvedEnrichPolicies() + var enrichMatchFields = enrichResolution.resolvedEnrichPolicies() .stream() .map(ResolvedEnrichPolicy::matchField) .collect(Collectors.toSet()); - Map unavailableClusters = enrichResolution.getUnavailableClusters(); - preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l, matchFields, requestFilter, enrichResolution); - }) - .>andThen((l, tuple) -> { - var indexResolution = tuple.v2(); - // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index - // resolution to updateExecutionInfo - if (indexResolution.isValid()) { - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); - if (executionInfo.isCrossClusterSearch() - && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { - // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel - // Exception to let the LogicalPlanActionListener decide how to proceed - finalListener.onFailure(new NoClustersToSearchException()); - return; - } + // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy + var fieldNames = fieldNames(parsed, enrichMatchFields); + ListenerResult listenerResult = new ListenerResult(enrichResolution, fieldNames, null, null); - Set newClusters = enrichPolicyResolver.groupIndicesPerCluster( - indexResolution.get().concreteIndices().toArray(String[]::new) - ).keySet(); - // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again - // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again. - // TODO: add a test for this - if (targetClusters.containsAll(newClusters) == false - // do not bother with a re-resolution if only remotes were requested and all were offline - && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) { - enrichPolicyResolver.resolvePolicies( - newClusters, - unresolvedPolicies, - new IndexResolutionWrappingListener(l, indexResolution) - ); + // first resolve the lookup indices, then the main indices + preAnalyzeLookupIndices(preAnalysis.lookupIndices, listenerResult, l); + }) + .andThen((l, listenerResult) -> { + // resolve the main indices + preAnalyzeIndices(parsed, executionInfo, listenerResult, requestFilter, l); + }) + .andThen((l, listenerResult) -> { + // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for + // invalid index resolution to updateExecutionInfo + if (listenerResult.indices.isValid()) { + // CCS indices and skip_unavailable cluster values can stop the analysis right here + if (analyzeCCSIndices(executionInfo, logicalPlanListener, l, listenerResult, targetClusters, unresolvedPolicies)) return; - } } // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step - l.onResponse(tuple); + l.onResponse(listenerResult); }) - .>andThen((l, tuple) -> { - var indexResolution = tuple.v2(); - var enrichResolution = tuple.v1(); - LogicalPlan plan = null; - - var filterPresentMessage = requestFilter == null ? "without" : "with"; - var attemptMessage = requestFilter == null ? "the only" : "first"; - LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage); - try { - plan = analyzeAction.apply(indexResolution, enrichResolution); - } catch (Exception e) { - if (e instanceof VerificationException ve) { - LOGGER.debug( - "Analyzing the plan ({} attempt, {} filter) failed with {}", - attemptMessage, - filterPresentMessage, - ve.getDetailedMessage() - ); - if (requestFilter == null) { - // if the initial request didn't have a filter, then just pass the exception back to the user - finalListener.onFailure(ve); - return; - } else { - // interested only in a VerificationException, but this time we are taking out the index filter - // to try and make the index resolution work without any index filtering. In the next step... to be continued - l.onResponse(tuple); - } - } else { - // if the query failed with any other type of exception, then just pass the exception back to the user - finalListener.onFailure(e); - return; - } - } - LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan); - // the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning - finalListener.onResponse(plan); + .andThen((l, listenerResult) -> { + // first attempt (maybe the only one) at analyzing the plan + analyzeAndMaybeRetry(analyzeAction, requestFilter, listenerResult, l, logicalPlanListener); }) - .>andThen((l, tuple) -> { + .andThen((l, listenerResult) -> { assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; - // extracting the matchFields again (same operation) - // TODO: need to find a way of reusing the matchFields from the first async step - var enrichResolution = tuple.v1(); - // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API - var matchFields = enrichResolution.resolvedEnrichPolicies() - .stream() - .map(ResolvedEnrichPolicy::matchField) - .collect(Collectors.toSet()); - Map unavailableClusters = enrichResolution.getUnavailableClusters(); // "reset" execution information for all ccs or non-ccs (local) clusters, since we are performing the indices // resolving one more time (the first attempt failed and the query had a filter) @@ -401,14 +345,14 @@ public void analyzedPlan( } // here the requestFilter is set to null, performing the pre-analysis after the first step failed - preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l, matchFields, null, enrichResolution); + preAnalyzeIndices(parsed, executionInfo, listenerResult, null, l); }) - .andThen((l, tuple) -> { + .andThen((l, listenerResult) -> { assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request"; LOGGER.debug("Analyzing the plan (second attempt, without filter)"); LogicalPlan plan; try { - plan = analyzeAction.apply(tuple.v2(), tuple.v1()); + plan = analyzeAction.apply(listenerResult.indices, listenerResult.lookupIndices, listenerResult.enrichResolution); } catch (Exception e) { l.onFailure(e); return; @@ -416,17 +360,46 @@ public void analyzedPlan( LOGGER.debug("Analyzed plan (second attempt, without filter):\n{}", plan); l.onResponse(plan); }) - .addListener(finalListener); + .addListener(logicalPlanListener); + } + + private void preAnalyzeLookupIndices(List indices, ListenerResult listenerResult, ActionListener listener) { + if (indices.size() > 1) { + // Note: JOINs on more than one index are not yet supported + listener.onFailure(new MappingException("More than one LOOKUP JOIN is not supported")); + } else if (indices.size() == 1) { + TableInfo tableInfo = indices.get(0); + TableIdentifier table = tableInfo.id(); + // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types + indexResolver.resolveAsMergedMapping( + table.index(), + listenerResult.fieldNames(), + null, + new ListenerResultWrappingListener<>(listener, listenerResult, listenerResult::withIndexResolution) + ); + } else { + try { + // No lookup indices specified + listener.onResponse( + new ListenerResult( + listenerResult.enrichResolution, + listenerResult.fieldNames, + IndexResolution.invalid("[none specified]"), + listenerResult.indices + ) + ); + } catch (Exception ex) { + listener.onFailure(ex); + } + } } private void preAnalyzeIndices( LogicalPlan parsed, EsqlExecutionInfo executionInfo, - Map unavailableClusters, // known to be unavailable from the enrich policy API call - ActionListener> listener, - Set enrichPolicyMatchFields, + ListenerResult listenerResult, QueryBuilder requestFilter, - EnrichResolution enrichResolution + ActionListener listener ) { PreAnalyzer.PreAnalysis preAnalysis = new PreAnalyzer().preAnalyze(parsed); // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one @@ -434,9 +407,10 @@ private void preAnalyzeIndices( // Note: JOINs are not supported but we detect them when listener.onFailure(new MappingException("Queries with multiple indices are not supported")); } else if (preAnalysis.indices.size() == 1) { + // known to be unavailable from the enrich policy API call + Map unavailableClusters = listenerResult.enrichResolution.getUnavailableClusters(); TableInfo tableInfo = preAnalysis.indices.get(0); TableIdentifier table = tableInfo.id(); - var fieldNames = fieldNames(parsed, enrichPolicyMatchFields); Map clusterIndices = indicesExpressionGrouper.groupIndices(IndicesOptions.DEFAULT, table.index()); for (Map.Entry entry : clusterIndices.entrySet()) { @@ -467,26 +441,119 @@ private void preAnalyzeIndices( String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution - listener.onResponse(new Tuple<>(enrichResolution, IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())))); + listener.onResponse( + new ListenerResult( + listenerResult.enrichResolution, + listenerResult.fieldNames, + listenerResult.lookupIndices, + IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())) + ) + ); } else { // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types indexResolver.resolveAsMergedMapping( indexExpressionToResolve, - fieldNames, + listenerResult.fieldNames, requestFilter, - new EnrichResolutionWrappingListener(listener, enrichResolution) + new ListenerResultWrappingListener<>(listener, listenerResult, listenerResult::withIndexResolution) ); } } else { try { // occurs when dealing with local relations (row a = 1) - listener.onResponse(new Tuple<>(enrichResolution, IndexResolution.invalid("[none specified]"))); + listener.onResponse( + new ListenerResult( + listenerResult.enrichResolution, + listenerResult.fieldNames, + listenerResult.lookupIndices, + IndexResolution.invalid("[none specified]") + ) + ); } catch (Exception ex) { listener.onFailure(ex); } } } + private boolean analyzeCCSIndices( + EsqlExecutionInfo executionInfo, + ActionListener logicalPlanListener, + ActionListener l, + ListenerResult listenerResult, + Set targetClusters, + Set unresolvedPolicies + ) { + IndexResolution indexResolution = listenerResult.indices; + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); + if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { + // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception + // to let the LogicalPlanActionListener decide how to proceed + logicalPlanListener.onFailure(new NoClustersToSearchException()); + return true; + } + + Set newClusters = enrichPolicyResolver.groupIndicesPerCluster( + indexResolution.get().concreteIndices().toArray(String[]::new) + ).keySet(); + // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again + // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again. + // TODO: add a test for this + if (targetClusters.containsAll(newClusters) == false + // do not bother with a re-resolution if only remotes were requested and all were offline + && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) { + enrichPolicyResolver.resolvePolicies( + newClusters, + unresolvedPolicies, + new ListenerResultWrappingListener<>(l, listenerResult, listenerResult::withEnrichResolution) + ); + return true; + } + return false; + } + + private static void analyzeAndMaybeRetry( + TriFunction analyzeAction, + QueryBuilder requestFilter, + ListenerResult listenerResult, + ActionListener l, + ActionListener logicalPlanListener + ) { + LogicalPlan plan = null; + var filterPresentMessage = requestFilter == null ? "without" : "with"; + var attemptMessage = requestFilter == null ? "the only" : "first"; + LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage); + + try { + plan = analyzeAction.apply(listenerResult.indices, listenerResult.lookupIndices, listenerResult.enrichResolution); + } catch (Exception e) { + if (e instanceof VerificationException ve) { + LOGGER.debug( + "Analyzing the plan ({} attempt, {} filter) failed with {}", + attemptMessage, + filterPresentMessage, + ve.getDetailedMessage() + ); + if (requestFilter == null) { + // if the initial request didn't have a filter, then just pass the exception back to the user + logicalPlanListener.onFailure(ve); + return; + } else { + // interested only in a VerificationException, but this time we are taking out the index filter + // to try and make the index resolution work without any index filtering. In the next step... to be continued + l.onResponse(listenerResult); + } + } else { + // if the query failed with any other type of exception, then just pass the exception back to the user + logicalPlanListener.onFailure(e); + return; + } + } + LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan); + // the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning + logicalPlanListener.onResponse(plan); + } + static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchFields) { if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) { // no explicit columns selection, for example "from employees" @@ -631,22 +698,20 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { return plan; } - class IndexResolutionWrappingListener implements ActionListener { - - private final ActionListener> delegate; - private final IndexResolution indexResolution; + class ListenerResultWrappingListener implements ActionListener { + private final ActionListener delegate; + private final ListenerResult result; + private final Function setter; - IndexResolutionWrappingListener( - ActionListener> delegate, - IndexResolution indexResolution - ) { + ListenerResultWrappingListener(ActionListener delegate, ListenerResult result, Function setter) { this.delegate = delegate; - this.indexResolution = indexResolution; + this.result = result; + this.setter = setter; } @Override - public void onResponse(EnrichResolution enrichResolution) { - delegate.onResponse(new Tuple<>(enrichResolution, indexResolution)); + public void onResponse(T t) { + delegate.onResponse(setter.apply(t)); } @Override @@ -655,27 +720,18 @@ public void onFailure(Exception e) { } } - class EnrichResolutionWrappingListener implements ActionListener { - - private final ActionListener> delegate; - private final EnrichResolution enrichResolution; - - EnrichResolutionWrappingListener( - ActionListener> delegate, - EnrichResolution enrichResolution - ) { - this.delegate = delegate; - this.enrichResolution = enrichResolution; - } - - @Override - public void onResponse(IndexResolution indexResolution) { - delegate.onResponse(new Tuple<>(enrichResolution, indexResolution)); + private record ListenerResult( + EnrichResolution enrichResolution, + Set fieldNames, + IndexResolution lookupIndices, + IndexResolution indices + ) { + ListenerResult withEnrichResolution(EnrichResolution newEnrichResolution) { + return new ListenerResult(newEnrichResolution, fieldNames(), lookupIndices(), indices()); } - @Override - public void onFailure(Exception e) { - delegate.onFailure(e); + ListenerResult withIndexResolution(IndexResolution newIndexResolution) { + return new ListenerResult(enrichResolution(), fieldNames(), lookupIndices(), newIndexResolution); } - } + }; } From 74185d42d302b510a603716c6d6d9fd2fc3ef740 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Mon, 25 Nov 2024 22:35:51 +0200 Subject: [PATCH 10/17] Put back code that existed before update from main --- .../xpack/esql/session/EsqlSession.java | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 5d762a3e48ed6..0be2a1a3a66df 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -65,6 +65,8 @@ import org.elasticsearch.xpack.esql.plan.logical.RegexExtract; import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation; import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; +import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; +import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; @@ -318,7 +320,7 @@ public void analyzedPlan( }) .andThen((l, listenerResult) -> { // resolve the main indices - preAnalyzeIndices(parsed, executionInfo, listenerResult, requestFilter, l); + preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, requestFilter, l); }) .andThen((l, listenerResult) -> { // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for @@ -345,7 +347,7 @@ public void analyzedPlan( } // here the requestFilter is set to null, performing the pre-analysis after the first step failed - preAnalyzeIndices(parsed, executionInfo, listenerResult, null, l); + preAnalyzeIndices(preAnalysis.indices, executionInfo, listenerResult, null, l); }) .andThen((l, listenerResult) -> { assert requestFilter != null : "The second analysis shouldn't take place when there is no index filter in the request"; @@ -395,21 +397,20 @@ private void preAnalyzeLookupIndices(List indices, ListenerResult lis } private void preAnalyzeIndices( - LogicalPlan parsed, + List indices, EsqlExecutionInfo executionInfo, ListenerResult listenerResult, QueryBuilder requestFilter, ActionListener listener ) { - PreAnalyzer.PreAnalysis preAnalysis = new PreAnalyzer().preAnalyze(parsed); // TODO we plan to support joins in the future when possible, but for now we'll just fail early if we see one - if (preAnalysis.indices.size() > 1) { + if (indices.size() > 1) { // Note: JOINs are not supported but we detect them when listener.onFailure(new MappingException("Queries with multiple indices are not supported")); - } else if (preAnalysis.indices.size() == 1) { + } else if (indices.size() == 1) { // known to be unavailable from the enrich policy API call Map unavailableClusters = listenerResult.enrichResolution.getUnavailableClusters(); - TableInfo tableInfo = preAnalysis.indices.get(0); + TableInfo tableInfo = indices.get(0); TableIdentifier table = tableInfo.id(); Map clusterIndices = indicesExpressionGrouper.groupIndices(IndicesOptions.DEFAULT, table.index()); @@ -575,6 +576,7 @@ static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchF // "keep" attributes are special whenever a wildcard is used in their name // ie "from test | eval lang = languages + 1 | keep *l" should consider both "languages" and "*l" as valid fields to ask for AttributeSet keepCommandReferences = new AttributeSet(); + AttributeSet keepJoinReferences = new AttributeSet(); List> keepMatches = new ArrayList<>(); List keepPatterns = new ArrayList<>(); @@ -593,6 +595,11 @@ static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchF // The exact name of the field will be added later as part of enrichPolicyMatchFields Set enrichRefs.removeIf(attr -> attr instanceof EmptyAttribute); references.addAll(enrichRefs); + } else if (p instanceof LookupJoin join) { + keepJoinReferences.addAll(join.config().matchFields()); // TODO: why is this empty + if (join.config().type() instanceof JoinTypes.UsingJoinType usingJoinType) { + keepJoinReferences.addAll(usingJoinType.columns()); + } } else { references.addAll(p.references()); if (p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES) { @@ -626,6 +633,8 @@ static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchF references.removeIf(attr -> matchByName(attr, alias.name(), keepCommandReferences.contains(attr))); }); }); + // Add JOIN ON column references afterward to avoid Alias removal + references.addAll(keepJoinReferences); // remove valid metadata attributes because they will be filtered out by the IndexResolver anyway // otherwise, in some edge cases, we will fail to ask for "*" (all fields) instead From 8d55211ee6cacaec5bafe36d30adec60130b0f19 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 26 Nov 2024 08:13:56 +0200 Subject: [PATCH 11/17] Small fix Align ListenerResult params with AnalyzerContext params --- .../xpack/esql/session/EsqlSession.java | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 0be2a1a3a66df..48ab304b0c384 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -313,7 +313,7 @@ public void analyzedPlan( .collect(Collectors.toSet()); // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy var fieldNames = fieldNames(parsed, enrichMatchFields); - ListenerResult listenerResult = new ListenerResult(enrichResolution, fieldNames, null, null); + ListenerResult listenerResult = new ListenerResult(null, null, enrichResolution, fieldNames); // first resolve the lookup indices, then the main indices preAnalyzeLookupIndices(preAnalysis.lookupIndices, listenerResult, l); @@ -377,17 +377,17 @@ private void preAnalyzeLookupIndices(List indices, ListenerResult lis table.index(), listenerResult.fieldNames(), null, - new ListenerResultWrappingListener<>(listener, listenerResult, listenerResult::withIndexResolution) + new ListenerResultWrappingListener<>(listener, listenerResult, listenerResult::withLookupIndexResolution) ); } else { try { // No lookup indices specified listener.onResponse( new ListenerResult( - listenerResult.enrichResolution, - listenerResult.fieldNames, + listenerResult.indices, IndexResolution.invalid("[none specified]"), - listenerResult.indices + listenerResult.enrichResolution, + listenerResult.fieldNames ) ); } catch (Exception ex) { @@ -444,10 +444,10 @@ private void preAnalyzeIndices( // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse( new ListenerResult( - listenerResult.enrichResolution, - listenerResult.fieldNames, + IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())), listenerResult.lookupIndices, - IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())) + listenerResult.enrichResolution, + listenerResult.fieldNames ) ); } else { @@ -464,10 +464,10 @@ private void preAnalyzeIndices( // occurs when dealing with local relations (row a = 1) listener.onResponse( new ListenerResult( - listenerResult.enrichResolution, - listenerResult.fieldNames, + IndexResolution.invalid("[none specified]"), listenerResult.lookupIndices, - IndexResolution.invalid("[none specified]") + listenerResult.enrichResolution, + listenerResult.fieldNames ) ); } catch (Exception ex) { @@ -730,17 +730,21 @@ public void onFailure(Exception e) { } private record ListenerResult( - EnrichResolution enrichResolution, - Set fieldNames, + IndexResolution indices, IndexResolution lookupIndices, - IndexResolution indices + EnrichResolution enrichResolution, + Set fieldNames ) { ListenerResult withEnrichResolution(EnrichResolution newEnrichResolution) { - return new ListenerResult(newEnrichResolution, fieldNames(), lookupIndices(), indices()); + return new ListenerResult(indices(), lookupIndices(), newEnrichResolution, fieldNames() ); } ListenerResult withIndexResolution(IndexResolution newIndexResolution) { - return new ListenerResult(enrichResolution(), fieldNames(), lookupIndices(), newIndexResolution); + return new ListenerResult(newIndexResolution, lookupIndices(), enrichResolution(), fieldNames()); + } + + ListenerResult withLookupIndexResolution(IndexResolution newIndexResolution) { + return new ListenerResult(indices(), newIndexResolution, enrichResolution(), fieldNames()); } }; } From 3be6c3ba441b12a8ed01c5fd5d91e5a1fe3e6a5d Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 26 Nov 2024 09:20:45 +0200 Subject: [PATCH 12/17] spotless --- .../java/org/elasticsearch/xpack/esql/session/EsqlSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 48ab304b0c384..129b65cf2fb64 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -736,7 +736,7 @@ private record ListenerResult( Set fieldNames ) { ListenerResult withEnrichResolution(EnrichResolution newEnrichResolution) { - return new ListenerResult(indices(), lookupIndices(), newEnrichResolution, fieldNames() ); + return new ListenerResult(indices(), lookupIndices(), newEnrichResolution, fieldNames()); } ListenerResult withIndexResolution(IndexResolution newIndexResolution) { From 8e5e462d4ea45a752d77ca2b2bdb5fc4a37c1d91 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 26 Nov 2024 21:11:57 +0200 Subject: [PATCH 13/17] Add IT tests --- .../multi_node/RequestIndexFilteringIT.java | 41 +++ .../single_node/RequestIndexFilteringIT.java | 41 +++ .../rest/RequestIndexFilteringTestCase.java | 290 ++++++++++++++++++ .../esql/qa/rest/RestEnrichTestCase.java | 176 ++++++++++- 4 files changed, 537 insertions(+), 11 deletions(-) create mode 100644 x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/RequestIndexFilteringIT.java create mode 100644 x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RequestIndexFilteringIT.java create mode 100644 x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java diff --git a/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/RequestIndexFilteringIT.java b/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/RequestIndexFilteringIT.java new file mode 100644 index 0000000000000..c74d305ba3bf3 --- /dev/null +++ b/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/RequestIndexFilteringIT.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.multi_node; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.xpack.esql.qa.rest.RequestIndexFilteringTestCase; +import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; +import org.junit.ClassRule; + +import java.util.Arrays; +import java.util.List; + +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase { + + @ClassRule + public static ElasticsearchCluster cluster = Clusters.testCluster(ignored -> {}); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @ParametersFactory(argumentFormatting = "%1s") + public static List modes() { + return Arrays.stream(RestEsqlTestCase.Mode.values()).map(m -> new Object[] { m }).toList(); + } + + public RequestIndexFilteringIT(RestEsqlTestCase.Mode mode) { + super(mode); + } +} diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RequestIndexFilteringIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RequestIndexFilteringIT.java new file mode 100644 index 0000000000000..5f8e67d14d8fd --- /dev/null +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RequestIndexFilteringIT.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.single_node; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.xpack.esql.qa.rest.RequestIndexFilteringTestCase; +import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; +import org.junit.ClassRule; + +import java.util.Arrays; +import java.util.List; + +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase { + + @ClassRule + public static ElasticsearchCluster cluster = Clusters.testCluster(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @ParametersFactory(argumentFormatting = "%1s") + public static List modes() { + return Arrays.stream(RestEsqlTestCase.Mode.values()).map(m -> new Object[] { m }).toList(); + } + + public RequestIndexFilteringIT(RestEsqlTestCase.Mode mode) { + super(mode); + } +} diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java new file mode 100644 index 0000000000000..e67ddc4c6a4c1 --- /dev/null +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java @@ -0,0 +1,290 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.rest; + +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.esql.AssertWarnings; +import org.junit.After; +import org.junit.Assert; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.test.ListMatcher.matchesList; +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.entityToMap; +import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.requestObjectBuilder; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.nullValue; + +public abstract class RequestIndexFilteringTestCase extends ESRestTestCase { + + protected final RestEsqlTestCase.Mode mode; + + protected RequestIndexFilteringTestCase(RestEsqlTestCase.Mode mode) { + this.mode = mode; + } + + @After + public void wipeTestData() throws IOException { + try { + var response = client().performRequest(new Request("DELETE", "/test*")); + assertEquals(200, response.getStatusLine().getStatusCode()); + } catch (ResponseException re) { + assertEquals(404, re.getResponse().getStatusLine().getStatusCode()); + } + } + + public void testTimestampFilterFromQuery() throws IOException { + int docsTest1 = 50; + int docsTest2 = 30; + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + indexTimestampData(docsTest2, "test2", "2023-11-26", "id2"); + + // filter includes both indices in the result (all columns, all rows) + RestEsqlTestCase.RequestObjectBuilder builder = timestampFilter("gte", "2023-01-01").query("FROM test*"); + Map result = runEsql(builder); + assertMap( + result, + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + .item(matchesMap().entry("name", "id2").entry("type", "integer")) + .item(matchesMap().entry("name", "value").entry("type", "long")) + ).entry("values", allOf(instanceOf(List.class), hasSize(docsTest1 + docsTest2))).entry("took", greaterThanOrEqualTo(0)) + ); + + // filter includes only test1. Columns from test2 are filtered out, as well (not only rows)! + builder = timestampFilter("gte", "2024-01-01").query("FROM test*"); + assertMap( + runEsql(builder), + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + .item(matchesMap().entry("name", "value").entry("type", "long")) + ).entry("values", allOf(instanceOf(List.class), hasSize(docsTest1))).entry("took", greaterThanOrEqualTo(0)) + ); + + // filter excludes both indices (no rows); the first analysis step fails because there are no columns, a second attempt succeeds + // after eliminating the index filter. All columns are returned. + builder = timestampFilter("gte", "2025-01-01").query("FROM test*"); + assertMap( + runEsql(builder), + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + .item(matchesMap().entry("name", "id2").entry("type", "integer")) + .item(matchesMap().entry("name", "value").entry("type", "long")) + ).entry("values", allOf(instanceOf(List.class), hasSize(0))).entry("took", greaterThanOrEqualTo(0)) + ); + } + + public void testFieldExistsFilter_KeepWildcard() throws IOException { + int docsTest1 = randomIntBetween(0, 10); + int docsTest2 = randomIntBetween(0, 10); + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + indexTimestampData(docsTest2, "test2", "2023-11-26", "id2"); + + // filter includes only test1. Columns are rows of test2 are filtered out + RestEsqlTestCase.RequestObjectBuilder builder = existsFilter("id1").query("FROM test*"); + Map result = runEsql(builder); + assertMap( + result, + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "@timestamp").entry("type", "date")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + .item(matchesMap().entry("name", "value").entry("type", "long")) + ).entry("values", allOf(instanceOf(List.class), hasSize(docsTest1))).entry("took", greaterThanOrEqualTo(0)) + ); + + // filter includes only test1. Columns from test2 are filtered out, as well (not only rows)! + builder = existsFilter("id1").query("FROM test* METADATA _index | KEEP _index, id*"); + result = runEsql(builder); + assertMap( + result, + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "_index").entry("type", "keyword")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + ).entry("values", allOf(instanceOf(List.class), hasSize(docsTest1))).entry("took", greaterThanOrEqualTo(0)) + ); + @SuppressWarnings("unchecked") + var values = (List>) result.get("values"); + for (List row : values) { + assertThat(row.get(0), equalTo("test1")); + assertThat(row.get(1), instanceOf(Integer.class)); + } + } + + public void testFieldExistsFilter_With_ExplicitUseOfDiscardedIndexFields() throws IOException { + int docsTest1 = randomIntBetween(1, 5); + int docsTest2 = randomIntBetween(0, 5); + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + indexTimestampData(docsTest2, "test2", "2023-11-26", "id2"); + + // test2 is explicitly used in a query with "SORT id2" even if the index filter should discard test2 + RestEsqlTestCase.RequestObjectBuilder builder = existsFilter("id1").query( + "FROM test* METADATA _index | SORT id2 | KEEP _index, id*" + ); + Map result = runEsql(builder); + assertMap( + result, + matchesMap().entry( + "columns", + matchesList().item(matchesMap().entry("name", "_index").entry("type", "keyword")) + .item(matchesMap().entry("name", "id1").entry("type", "integer")) + .item(matchesMap().entry("name", "id2").entry("type", "integer")) + ).entry("values", allOf(instanceOf(List.class), hasSize(docsTest1))).entry("took", greaterThanOrEqualTo(0)) + ); + @SuppressWarnings("unchecked") + var values = (List>) result.get("values"); + for (List row : values) { + assertThat(row.get(0), equalTo("test1")); + assertThat(row.get(1), instanceOf(Integer.class)); + assertThat(row.get(2), nullValue()); + } + } + + public void testFieldNameTypo() throws IOException { + int docsTest1 = randomIntBetween(0, 5); + int docsTest2 = randomIntBetween(0, 5); + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + indexTimestampData(docsTest2, "test2", "2023-11-26", "id2"); + + // idx field name is explicitly used, though it doesn't exist in any of the indices. First test - without filter + ResponseException e = expectThrows( + ResponseException.class, + () -> runEsql(requestObjectBuilder().query("FROM test* | WHERE idx == 123")) + ); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("verification_exception")); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]")); + + e = expectThrows(ResponseException.class, () -> runEsql(requestObjectBuilder().query("FROM test1 | WHERE idx == 123"))); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("verification_exception")); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]")); + + e = expectThrows( + ResponseException.class, + () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM test* | WHERE idx == 123")) + ); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]")); + + e = expectThrows( + ResponseException.class, + () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM test2 | WHERE idx == 123")) + ); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 1:20: Unknown column [idx]")); + } + + public void testIndicesDontExist() throws IOException { + int docsTest1 = 0; // we are interested only in the created index, not necessarily that it has data + indexTimestampData(docsTest1, "test1", "2024-11-26", "id1"); + + ResponseException e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo"))); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("verification_exception")); + assertThat(e.getMessage(), containsString("Unknown index [foo]")); + + e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo*"))); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("verification_exception")); + assertThat(e.getMessage(), containsString("Unknown index [foo*]")); + + e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo,test1"))); + assertEquals(404, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("index_not_found_exception")); + assertThat(e.getMessage(), containsString("no such index [foo]")); + } + + private static RestEsqlTestCase.RequestObjectBuilder timestampFilter(String op, String date) throws IOException { + return requestObjectBuilder().filter(b -> { + b.startObject("range"); + { + b.startObject("@timestamp").field(op, date).endObject(); + } + b.endObject(); + }); + } + + private static RestEsqlTestCase.RequestObjectBuilder existsFilter(String field) throws IOException { + return requestObjectBuilder().filter(b -> b.startObject("exists").field("field", field).endObject()); + } + + public Map runEsql(RestEsqlTestCase.RequestObjectBuilder requestObject) throws IOException { + return RestEsqlTestCase.runEsql(requestObject, new AssertWarnings.NoWarnings(), mode); + } + + protected void indexTimestampData(int docs, String indexName, String date, String differentiatorFieldName) throws IOException { + Request createIndex = new Request("PUT", indexName); + createIndex.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 3 + } + }, + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "%differentiator_field_name%": { + "type": "integer" + } + } + } + }""".replace("%differentiator_field_name%", differentiatorFieldName)); + Response response = client().performRequest(createIndex); + assertThat( + entityToMap(response.getEntity(), XContentType.JSON), + matchesMap().entry("shards_acknowledged", true).entry("index", indexName).entry("acknowledged", true) + ); + + if (docs > 0) { + StringBuilder b = new StringBuilder(); + for (int i = 0; i < docs; i++) { + b.append(String.format(Locale.ROOT, """ + {"create":{"_index":"%s"}} + {"@timestamp":"%s","value":%d,"%s":%d} + """, indexName, date, i, differentiatorFieldName, i)); + } + Request bulk = new Request("POST", "/_bulk"); + bulk.addParameter("refresh", "true"); + bulk.addParameter("filter_path", "errors"); + bulk.setJsonEntity(b.toString()); + response = client().performRequest(bulk); + Assert.assertEquals("{\"errors\":false}", EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + } + } +} diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEnrichTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEnrichTestCase.java index def6491fb920f..bf4a4400e13cf 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEnrichTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEnrichTestCase.java @@ -12,7 +12,9 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xcontent.XContentBuilder; import org.junit.After; import org.junit.Before; @@ -29,7 +31,6 @@ public abstract class RestEnrichTestCase extends ESRestTestCase { private static final String sourceIndexName = "countries"; - private static final String testIndexName = "test"; private static final String policyName = "countries"; public enum Mode { @@ -56,7 +57,7 @@ public void assertRequestBreakerEmpty() throws Exception { @Before public void loadTestData() throws IOException { - Request request = new Request("PUT", "/" + testIndexName); + Request request = new Request("PUT", "/test1"); request.setJsonEntity(""" { "mappings": { @@ -72,7 +73,7 @@ public void loadTestData() throws IOException { }"""); assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); - request = new Request("POST", "/" + testIndexName + "/_bulk"); + request = new Request("POST", "/test1/_bulk"); request.addParameter("refresh", "true"); request.setJsonEntity(""" { "index": {"_id": 1} } @@ -84,6 +85,34 @@ public void loadTestData() throws IOException { """); assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + request = new Request("PUT", "/test2"); + request.setJsonEntity(""" + { + "mappings": { + "properties": { + "geo.dest": { + "type": "keyword" + }, + "country_number": { + "type": "long" + } + } + } + }"""); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + + request = new Request("POST", "/test2/_bulk"); + request.addParameter("refresh", "true"); + request.setJsonEntity(""" + { "index": {"_id": 1} } + { "geo.dest": "IN", "country_number": 2 } + { "index": {"_id": 2} } + { "geo.dest": "IN", "country_number": 2 } + { "index": {"_id": 3} } + { "geo.dest": "US", "country_number": 3 } + """); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + request = new Request("PUT", "/" + sourceIndexName); request.setJsonEntity(""" { @@ -131,7 +160,7 @@ public void loadTestData() throws IOException { @After public void wipeTestData() throws IOException { try { - var response = client().performRequest(new Request("DELETE", "/" + testIndexName)); + var response = client().performRequest(new Request("DELETE", "/test1,test2")); assertEquals(200, response.getStatusLine().getStatusCode()); response = client().performRequest(new Request("DELETE", "/" + sourceIndexName)); assertEquals(200, response.getStatusLine().getStatusCode()); @@ -143,7 +172,7 @@ public void wipeTestData() throws IOException { } public void testNonExistentEnrichPolicy() throws IOException { - ResponseException re = expectThrows(ResponseException.class, () -> runEsql("from test | enrich countris", Mode.SYNC)); + ResponseException re = expectThrows(ResponseException.class, () -> runEsql("from test1 | enrich countris", null, Mode.SYNC)); assertThat( EntityUtils.toString(re.getResponse().getEntity()), containsString("cannot find enrich policy [countris], did you mean [countries]?") @@ -151,7 +180,10 @@ public void testNonExistentEnrichPolicy() throws IOException { } public void testNonExistentEnrichPolicy_KeepField() throws IOException { - ResponseException re = expectThrows(ResponseException.class, () -> runEsql("from test | enrich countris | keep number", Mode.SYNC)); + ResponseException re = expectThrows( + ResponseException.class, + () -> runEsql("from test1 | enrich countris | keep number", null, Mode.SYNC) + ); assertThat( EntityUtils.toString(re.getResponse().getEntity()), containsString("cannot find enrich policy [countris], did you mean [countries]?") @@ -159,25 +191,147 @@ public void testNonExistentEnrichPolicy_KeepField() throws IOException { } public void testMatchField_ImplicitFieldsList() throws IOException { - Map result = runEsql("from test | enrich countries | keep number | sort number"); + Map result = runEsql("from test1 | enrich countries | keep number | sort number"); var columns = List.of(Map.of("name", "number", "type", "long")); var values = List.of(List.of(1000), List.of(1000), List.of(5000)); assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); } public void testMatchField_ImplicitFieldsList_WithStats() throws IOException { - Map result = runEsql("from test | enrich countries | stats s = sum(number) by country_name"); + Map result = runEsql("from test1 | enrich countries | stats s = sum(number) by country_name"); var columns = List.of(Map.of("name", "s", "type", "long"), Map.of("name", "country_name", "type", "keyword")); var values = List.of(List.of(2000, "United States of America"), List.of(5000, "China")); assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); } + public void testSimpleIndexFilteringWithEnrich() throws IOException { + // no filter + Map result = runEsql(""" + from test* metadata _index + | enrich countries + | keep *number, geo.dest, _index + | sort geo.dest, _index + """); + var columns = List.of( + Map.of("name", "country_number", "type", "long"), + Map.of("name", "number", "type", "long"), + Map.of("name", "geo.dest", "type", "keyword"), + Map.of("name", "_index", "type", "keyword") + ); + var values = List.of( + Arrays.asList(null, 5000, "CN", "test1"), + Arrays.asList(2, null, "IN", "test2"), + Arrays.asList(2, null, "IN", "test2"), + Arrays.asList(null, 1000, "US", "test1"), + Arrays.asList(null, 1000, "US", "test1"), + Arrays.asList(3, null, "US", "test2") + ); + assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + + // filter something that won't affect the columns + result = runEsql(""" + from test* metadata _index + | enrich countries + | keep *number, geo.dest, _index + | sort geo.dest, _index + """, b -> b.startObject("exists").field("field", "foobar").endObject()); + assertMap(result, matchesMap().entry("columns", columns).entry("values", List.of()).entry("took", greaterThanOrEqualTo(0))); + } + + public void testIndexFilteringWithEnrich_RemoveOneIndex() throws IOException { + // filter out test2 but specifically use one of its fields in the query (country_number) + Map result = runEsql(""" + from test* metadata _index + | enrich countries + | keep country_number, number, geo.dest, _index + | sort geo.dest, _index + """, b -> b.startObject("exists").field("field", "number").endObject()); + + var columns = List.of( + Map.of("name", "country_number", "type", "long"), + Map.of("name", "number", "type", "long"), + Map.of("name", "geo.dest", "type", "keyword"), + Map.of("name", "_index", "type", "keyword") + ); + var values = List.of( + Arrays.asList(null, 5000, "CN", "test1"), + Arrays.asList(null, 1000, "US", "test1"), + Arrays.asList(null, 1000, "US", "test1") + ); + + assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + + // filter out test2 and use a wildcarded field name in the "keep" command + result = runEsql(""" + from test* metadata _index + | enrich countries + | keep *number, geo.dest, _index + | sort geo.dest, _index + """, b -> b.startObject("exists").field("field", "number").endObject()); + + columns = List.of( + Map.of("name", "number", "type", "long"), + Map.of("name", "geo.dest", "type", "keyword"), + Map.of("name", "_index", "type", "keyword") + ); + values = List.of(Arrays.asList(5000, "CN", "test1"), Arrays.asList(1000, "US", "test1"), Arrays.asList(1000, "US", "test1")); + assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + } + + public void testIndexFilteringWithEnrich_ExpectException() throws IOException { + // no filter, just a simple query with "enrich" that should throw a valid VerificationException + ResponseException e = expectThrows(ResponseException.class, () -> runEsql(""" + from test* metadata _index + | enrich countries + | where foobar == 123 + """)); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 3:13: Unknown column [foobar]")); + + // same query, but with a filter this time + e = expectThrows(ResponseException.class, () -> runEsql(""" + from test* metadata _index + | enrich countries + | where foobar == 123 + """, b -> b.startObject("exists").field("field", "number").endObject())); + assertEquals(400, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getMessage(), containsString("Found 1 problem")); + assertThat(e.getMessage(), containsString("line 3:13: Unknown column [foobar]")); + } + + public void testIndexFilteringWithEnrich_FilterUnusedIndexFields() throws IOException { + // filter out "test1". The field that is specific to "test1" ("number") is not actually used in the query + Map result = runEsql(""" + from test* metadata _index + | enrich countries + | keep country_number, geo.dest, _index + | sort geo.dest, _index + """, b -> b.startObject("exists").field("field", "country_number").endObject()); + + var columns = List.of( + Map.of("name", "country_number", "type", "long"), + Map.of("name", "geo.dest", "type", "keyword"), + Map.of("name", "_index", "type", "keyword") + ); + var values = List.of(Arrays.asList(2, "IN", "test2"), Arrays.asList(2, "IN", "test2"), Arrays.asList(3, "US", "test2")); + assertMap(result, matchesMap().entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + } + private Map runEsql(String query) throws IOException { - return runEsql(query, mode); + return runEsql(query, null, mode); } - private Map runEsql(String query, Mode mode) throws IOException { - var requestObject = new RestEsqlTestCase.RequestObjectBuilder().query(query); + private Map runEsql(String query, CheckedConsumer filter) throws IOException { + return runEsql(query, filter, mode); + } + + private Map runEsql(String query, CheckedConsumer filter, Mode mode) throws IOException { + var requestObject = new RestEsqlTestCase.RequestObjectBuilder(); + if (filter != null) { + requestObject.filter(filter); + } + requestObject.query(query); if (mode == Mode.ASYNC) { return RestEsqlTestCase.runEsqlAsync(requestObject); } else { From 0b7111b9cfc1fba14022f37795e9f8bed786da28 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 26 Nov 2024 23:25:48 +0200 Subject: [PATCH 14/17] Don't run the tests in async as well; it's too unpredictable --- .../qa/multi_node/RequestIndexFilteringIT.java | 14 -------------- .../qa/single_node/RequestIndexFilteringIT.java | 14 -------------- .../qa/rest/RequestIndexFilteringTestCase.java | 8 +------- 3 files changed, 1 insertion(+), 35 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/RequestIndexFilteringIT.java b/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/RequestIndexFilteringIT.java index c74d305ba3bf3..c2ba502b92554 100644 --- a/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/RequestIndexFilteringIT.java +++ b/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/RequestIndexFilteringIT.java @@ -7,18 +7,13 @@ package org.elasticsearch.xpack.esql.qa.multi_node; -import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; import org.elasticsearch.test.TestClustersThreadFilter; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.xpack.esql.qa.rest.RequestIndexFilteringTestCase; -import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; import org.junit.ClassRule; -import java.util.Arrays; -import java.util.List; - @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase { @@ -29,13 +24,4 @@ public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase { protected String getTestRestCluster() { return cluster.getHttpAddresses(); } - - @ParametersFactory(argumentFormatting = "%1s") - public static List modes() { - return Arrays.stream(RestEsqlTestCase.Mode.values()).map(m -> new Object[] { m }).toList(); - } - - public RequestIndexFilteringIT(RestEsqlTestCase.Mode mode) { - super(mode); - } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RequestIndexFilteringIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RequestIndexFilteringIT.java index 5f8e67d14d8fd..f13bcd618f0a8 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RequestIndexFilteringIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RequestIndexFilteringIT.java @@ -7,18 +7,13 @@ package org.elasticsearch.xpack.esql.qa.single_node; -import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; import org.elasticsearch.test.TestClustersThreadFilter; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.xpack.esql.qa.rest.RequestIndexFilteringTestCase; -import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; import org.junit.ClassRule; -import java.util.Arrays; -import java.util.List; - @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase { @@ -29,13 +24,4 @@ public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase { protected String getTestRestCluster() { return cluster.getHttpAddresses(); } - - @ParametersFactory(argumentFormatting = "%1s") - public static List modes() { - return Arrays.stream(RestEsqlTestCase.Mode.values()).map(m -> new Object[] { m }).toList(); - } - - public RequestIndexFilteringIT(RestEsqlTestCase.Mode mode) { - super(mode); - } } diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java index e67ddc4c6a4c1..3314430d63eaa 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java @@ -38,12 +38,6 @@ public abstract class RequestIndexFilteringTestCase extends ESRestTestCase { - protected final RestEsqlTestCase.Mode mode; - - protected RequestIndexFilteringTestCase(RestEsqlTestCase.Mode mode) { - this.mode = mode; - } - @After public void wipeTestData() throws IOException { try { @@ -242,7 +236,7 @@ private static RestEsqlTestCase.RequestObjectBuilder existsFilter(String field) } public Map runEsql(RestEsqlTestCase.RequestObjectBuilder requestObject) throws IOException { - return RestEsqlTestCase.runEsql(requestObject, new AssertWarnings.NoWarnings(), mode); + return RestEsqlTestCase.runEsql(requestObject, new AssertWarnings.NoWarnings(), RestEsqlTestCase.Mode.SYNC); } protected void indexTimestampData(int docs, String indexName, String date, String differentiatorFieldName) throws IOException { From 1819f4579edf05d1c9f0fb01b8fa7f41e155a156 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 26 Nov 2024 23:34:27 +0200 Subject: [PATCH 15/17] Update docs/changelog/116755.yaml --- docs/changelog/116755.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/116755.yaml diff --git a/docs/changelog/116755.yaml b/docs/changelog/116755.yaml new file mode 100644 index 0000000000000..3aa5ec8580b59 --- /dev/null +++ b/docs/changelog/116755.yaml @@ -0,0 +1,5 @@ +pr: 116755 +summary: Smarter field caps with subscribable listener +area: ES|QL +type: enhancement +issues: [] From ba3425c6ef2d2854919b15d5c7f996830080b483 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 3 Dec 2024 17:23:07 +0200 Subject: [PATCH 16/17] Address reviews --- .../xpack/esql/session/EsqlSession.java | 56 ++++++------------- 1 file changed, 16 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 506f62fc12b73..97a345580d20e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -81,7 +81,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -154,13 +153,13 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P analyzedPlan( parse(request.query(), request.params()), executionInfo, + request.filter(), new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { executeOptimizedPlan(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener); } - }, - request.filter() + } ); } @@ -275,8 +274,8 @@ private LogicalPlan parse(String query, QueryParams params) { public void analyzedPlan( LogicalPlan parsed, EsqlExecutionInfo executionInfo, - ActionListener logicalPlanListener, - QueryBuilder requestFilter + QueryBuilder requestFilter, + ActionListener logicalPlanListener ) { if (parsed.analyzed()) { logicalPlanListener.onResponse(parsed); @@ -327,7 +326,7 @@ public void analyzedPlan( // invalid index resolution to updateExecutionInfo if (listenerResult.indices.isValid()) { // CCS indices and skip_unavailable cluster values can stop the analysis right here - if (analyzeCCSIndices(executionInfo, logicalPlanListener, l, listenerResult, targetClusters, unresolvedPolicies)) + if (analyzeCCSIndices(executionInfo, targetClusters, unresolvedPolicies, listenerResult, logicalPlanListener, l)) return; } // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step @@ -335,7 +334,7 @@ public void analyzedPlan( }) .andThen((l, listenerResult) -> { // first attempt (maybe the only one) at analyzing the plan - analyzeAndMaybeRetry(analyzeAction, requestFilter, listenerResult, l, logicalPlanListener); + analyzeAndMaybeRetry(analyzeAction, requestFilter, listenerResult, logicalPlanListener, l); }) .andThen((l, listenerResult) -> { assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; @@ -377,7 +376,7 @@ private void preAnalyzeLookupIndices(List indices, ListenerResult lis table.index(), listenerResult.fieldNames(), null, - new ListenerResultWrappingListener<>(listener, listenerResult, listenerResult::withLookupIndexResolution) + listener.map(indexResolution -> listenerResult.withLookupIndexResolution(indexResolution)) ); } else { try { @@ -456,7 +455,7 @@ private void preAnalyzeIndices( indexExpressionToResolve, listenerResult.fieldNames, requestFilter, - new ListenerResultWrappingListener<>(listener, listenerResult, listenerResult::withIndexResolution) + listener.map(indexResolution -> listenerResult.withIndexResolution(indexResolution)) ); } } else { @@ -478,11 +477,11 @@ private void preAnalyzeIndices( private boolean analyzeCCSIndices( EsqlExecutionInfo executionInfo, - ActionListener logicalPlanListener, - ActionListener l, - ListenerResult listenerResult, Set targetClusters, - Set unresolvedPolicies + Set unresolvedPolicies, + ListenerResult listenerResult, + ActionListener logicalPlanListener, + ActionListener l ) { IndexResolution indexResolution = listenerResult.indices; EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -506,7 +505,7 @@ private boolean analyzeCCSIndices( enrichPolicyResolver.resolvePolicies( newClusters, unresolvedPolicies, - new ListenerResultWrappingListener<>(l, listenerResult, listenerResult::withEnrichResolution) + l.map(enrichResolution -> listenerResult.withEnrichResolution(enrichResolution)) ); return true; } @@ -517,8 +516,8 @@ private static void analyzeAndMaybeRetry( TriFunction analyzeAction, QueryBuilder requestFilter, ListenerResult listenerResult, - ActionListener l, - ActionListener logicalPlanListener + ActionListener logicalPlanListener, + ActionListener l ) { LogicalPlan plan = null; var filterPresentMessage = requestFilter == null ? "without" : "with"; @@ -538,16 +537,15 @@ private static void analyzeAndMaybeRetry( if (requestFilter == null) { // if the initial request didn't have a filter, then just pass the exception back to the user logicalPlanListener.onFailure(ve); - return; } else { // interested only in a VerificationException, but this time we are taking out the index filter // to try and make the index resolution work without any index filtering. In the next step... to be continued l.onResponse(listenerResult); } + return; } else { // if the query failed with any other type of exception, then just pass the exception back to the user logicalPlanListener.onFailure(e); - return; } } LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan); @@ -707,28 +705,6 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { return plan; } - class ListenerResultWrappingListener implements ActionListener { - private final ActionListener delegate; - private final ListenerResult result; - private final Function setter; - - ListenerResultWrappingListener(ActionListener delegate, ListenerResult result, Function setter) { - this.delegate = delegate; - this.result = result; - this.setter = setter; - } - - @Override - public void onResponse(T t) { - delegate.onResponse(setter.apply(t)); - } - - @Override - public void onFailure(Exception e) { - delegate.onFailure(e); - } - } - private record ListenerResult( IndexResolution indices, IndexResolution lookupIndices, From 2e0493d0bb9fec51c22e4155784a446d3faa8115 Mon Sep 17 00:00:00 2001 From: Andrei Stefan Date: Tue, 3 Dec 2024 17:52:06 +0200 Subject: [PATCH 17/17] Minor change --- .../java/org/elasticsearch/xpack/esql/session/EsqlSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index a02afa6ccc6fa..71fba5683644d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -542,11 +542,11 @@ private static void analyzeAndMaybeRetry( // to try and make the index resolution work without any index filtering. In the next step... to be continued l.onResponse(listenerResult); } - return; } else { // if the query failed with any other type of exception, then just pass the exception back to the user logicalPlanListener.onFailure(e); } + return; } LOGGER.debug("Analyzed plan ({} attempt, {} filter):\n{}", attemptMessage, filterPresentMessage, plan); // the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning