From aaed7001aea8fc01a506b09766043d899c42db5d Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 29 Nov 2019 14:20:31 +0200 Subject: [PATCH] [ML] Add optional source filtering during data frame reindexing (#49690) This adds a `_source` setting under the `source` setting of a data frame analytics config. The new `_source` is reusing the structure of a `FetchSourceContext` like `analyzed_fields` does. Specifying includes and excludes for source allows selecting which fields will get reindexed and will be available in the destination index. Closes #49531 --- .../dataframe/DataFrameAnalyticsSource.java | 30 +++- .../MlClientDocumentationIT.java | 3 + .../DataFrameAnalyticsSourceTests.java | 9 ++ .../ml/put-data-frame-analytics.asciidoc | 1 + .../apis/dfanalyticsresources.asciidoc | 32 ++-- .../apis/put-dfanalytics.asciidoc | 53 +++--- .../action/PutDataFrameAnalyticsAction.java | 20 +++ .../dataframe/DataFrameAnalyticsConfig.java | 2 +- .../dataframe/DataFrameAnalyticsSource.java | 86 +++++++++- .../persistence/ElasticsearchMappings.java | 3 + .../ml/job/results/ReservedFieldNames.java | 1 + ...tDataFrameAnalyticsActionRequestTests.java | 44 +++++ .../DataFrameAnalyticsSourceTests.java | 73 ++++++++- .../ml/qa/ml-with-security/build.gradle | 1 + .../ExplainDataFrameAnalyticsIT.java | 3 +- ...NativeDataFrameAnalyticsIntegTestCase.java | 2 +- .../integration/RunDataFrameAnalyticsIT.java | 6 +- ...ransportStartDataFrameAnalyticsAction.java | 2 +- .../ml/dataframe/DataFrameAnalyticsIndex.java | 6 +- .../dataframe/DataFrameAnalyticsManager.java | 1 + .../xpack/ml/dataframe/MappingsMerger.java | 26 +-- .../extractor/ExtractedFieldsDetector.java | 11 ++ .../DataFrameAnalyticsIndexTests.java | 2 +- .../ml/dataframe/MappingsMergerTests.java | 46 +++++- .../dataframe/SourceDestValidatorTests.java | 2 +- .../ExtractedFieldsDetectorTests.java | 153 +++++++++++------- .../AnalyticsResultProcessorTests.java | 2 +- .../test/ml/data_frame_analytics_crud.yml | 28 +++- 28 files changed, 521 insertions(+), 127 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java index 9a6de159bea3e..1f731f4c28aaa 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSource.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; import java.util.Arrays; @@ -44,20 +45,27 @@ public static Builder builder() { private static final ParseField INDEX = new ParseField("index"); private static final ParseField QUERY = new ParseField("query"); + public static final ParseField _SOURCE = new ParseField("_source"); private static ObjectParser PARSER = new ObjectParser<>("data_frame_analytics_source", true, Builder::new); static { PARSER.declareStringArray(Builder::setIndex, INDEX); PARSER.declareObject(Builder::setQueryConfig, (p, c) -> QueryConfig.fromXContent(p), QUERY); + PARSER.declareField(Builder::setSourceFiltering, + (p, c) -> FetchSourceContext.fromXContent(p), + _SOURCE, + ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING); } private final String[] index; private final QueryConfig queryConfig; + private final FetchSourceContext sourceFiltering; - private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig) { + private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering) { this.index = Objects.requireNonNull(index); this.queryConfig = queryConfig; + this.sourceFiltering = sourceFiltering; } public String[] getIndex() { @@ -68,6 +76,10 @@ public QueryConfig getQueryConfig() { return queryConfig; } + public FetchSourceContext getSourceFiltering() { + return sourceFiltering; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -75,6 +87,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (queryConfig != null) { builder.field(QUERY.getPreferredName(), queryConfig.getQuery()); } + if (sourceFiltering != null) { + builder.field(_SOURCE.getPreferredName(), sourceFiltering); + } builder.endObject(); return builder; } @@ -86,12 +101,13 @@ public boolean equals(Object o) { DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o; return Arrays.equals(index, other.index) - && Objects.equals(queryConfig, other.queryConfig); + && Objects.equals(queryConfig, other.queryConfig) + && Objects.equals(sourceFiltering, other.sourceFiltering); } @Override public int hashCode() { - return Objects.hash(Arrays.asList(index), queryConfig); + return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering); } @Override @@ -103,6 +119,7 @@ public static class Builder { private String[] index; private QueryConfig queryConfig; + private FetchSourceContext sourceFiltering; private Builder() {} @@ -121,8 +138,13 @@ public Builder setQueryConfig(QueryConfig queryConfig) { return this; } + public Builder setSourceFiltering(FetchSourceContext sourceFiltering) { + this.sourceFiltering = sourceFiltering; + return this; + } + public DataFrameAnalyticsSource build() { - return new DataFrameAnalyticsSource(index, queryConfig); + return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 8c6e134822b60..1d9a151cf8ae3 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -2939,6 +2939,9 @@ public void testPutDataFrameAnalytics() throws Exception { DataFrameAnalyticsSource sourceConfig = DataFrameAnalyticsSource.builder() // <1> .setIndex("put-test-source-index") // <2> .setQueryConfig(queryConfig) // <3> + .setSourceFiltering(new FetchSourceContext(true, + new String[] { "included_field_1", "included_field_2" }, + new String[] { "excluded_field" })) // <4> .build(); // end::put-data-frame-analytics-source-config diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java index d82e1999f3034..3fae44aad9060 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsSourceTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.test.AbstractXContentTestCase; import java.io.IOException; @@ -35,9 +36,17 @@ public class DataFrameAnalyticsSourceTests extends AbstractXContentTestCase { public static DataFrameAnalyticsSource randomSourceConfig() { + FetchSourceContext sourceFiltering = null; + if (randomBoolean()) { + sourceFiltering = new FetchSourceContext(true, + generateRandomStringArray(10, 10, false, false), + generateRandomStringArray(10, 10, false, false)); + } + return DataFrameAnalyticsSource.builder() .setIndex(generateRandomStringArray(10, 10, false, false)) .setQueryConfig(randomBoolean() ? null : randomQueryConfig()) + .setSourceFiltering(sourceFiltering) .build(); } diff --git a/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc b/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc index c4e7184de7e04..91a97ad604cee 100644 --- a/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc +++ b/docs/java-rest/high-level/ml/put-data-frame-analytics.asciidoc @@ -52,6 +52,7 @@ include-tagged::{doc-tests-file}[{api}-source-config] <1> Constructing a new DataFrameAnalyticsSource <2> The source index <3> The query from which to gather the data. If query is not set, a `match_all` query is used by default. +<4> Source filtering to select which fields will exist in the destination index. ===== QueryConfig diff --git a/docs/reference/ml/df-analytics/apis/dfanalyticsresources.asciidoc b/docs/reference/ml/df-analytics/apis/dfanalyticsresources.asciidoc index 62b5b121528a5..e8ee463c66af7 100644 --- a/docs/reference/ml/df-analytics/apis/dfanalyticsresources.asciidoc +++ b/docs/reference/ml/df-analytics/apis/dfanalyticsresources.asciidoc @@ -16,17 +16,18 @@ <>. `analyzed_fields`:: - (object) You can specify both `includes` and/or `excludes` patterns. If - `analyzed_fields` is not set, only the relevant fields will be included. For - example, all the numeric fields for {oldetection}. For the supported field - types, see <>. + (Optional, object) Specify `includes` and/or `excludes` patterns to select + which fields will be included in the analysis. If `analyzed_fields` is not set, + only the relevant fields will be included. For example, all the numeric fields + for {oldetection}. For the supported field types, see <>. + Also see the <> which helps understand field selection. `includes`::: - (array) An array of strings that defines the fields that will be included in + (Optional, array) An array of strings that defines the fields that will be included in the analysis. `excludes`::: - (array) An array of strings that defines the fields that will be excluded + (Optional, array) An array of strings that defines the fields that will be excluded from the analysis. @@ -81,8 +82,8 @@ PUT _ml/data_frame/analytics/loganalytics that setting. For more information, see <>. `source`:: - (object) The source configuration consisting an `index` and optionally a - `query` object. + (object) The configuration of how to source the analysis data. It requires an `index`. + Optionally, `query` and `_source` may be specified. `index`::: (Required, string or array) Index or indices on which to perform the @@ -96,6 +97,19 @@ PUT _ml/data_frame/analytics/loganalytics as this object is passed verbatim to {es}. By default, this property has the following value: `{"match_all": {}}`. + `_source`::: + (Optional, object) Specify `includes` and/or `excludes` patterns to select + which fields will be present in the destination. Fields that are excluded + cannot be included in the analysis. + + `includes`:::: + (array) An array of strings that defines the fields that will be included in + the destination. + + `excludes`:::: + (array) An array of strings that defines the fields that will be excluded + from the destination. + [[dfanalytics-types]] ==== Analysis objects @@ -277,4 +291,4 @@ improvement. If you override any parameters, then the optimization will calculate the value of the remaining parameters accordingly and use the value you provided for the overridden parameter. The number of rounds are reduced respectively. The validation error is estimated in each round by using 4-fold -cross validation. \ No newline at end of file +cross validation. diff --git a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc index 159f0cb61a0c4..b4971fffa9c49 100644 --- a/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc +++ b/docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc @@ -101,13 +101,13 @@ single number. For example, in case of age ranges, you can model the values as <>. `analyzed_fields`:: - (Optional, object) You can specify both `includes` and/or `excludes` patterns. - If `analyzed_fields` is not set, only the relevant fields will be included. - For example, all the numeric fields for {oldetection}. For the supported field - types, see <>. If you specify fields – - either in `includes` or in `excludes` – that have a data type that is not - supported, an error occurs. - + (Optional, object) Specify `includes` and/or `excludes` patterns to select + which fields will be included in the analysis. If `analyzed_fields` is not set, + only the relevant fields will be included. For example, all the numeric fields + for {oldetection}. For the supported field types, see <>. + Also see the <> which helps understand + field selection. + `includes`::: (Optional, array) An array of strings that defines the fields that will be included in the analysis. @@ -142,20 +142,33 @@ single number. For example, in case of age ranges, you can model the values as that setting. For more information, see <>. `source`:: - (Required, object) The source configuration, consisting of `index` and - optionally a `query`. + (object) The configuration of how to source the analysis data. It requires an `index`. + Optionally, `query` and `_source` may be specified. - `index`::: - (Required, string or array) Index or indices on which to perform the - analysis. It can be a single index or index pattern as well as an array of - indices or patterns. - - `query`::: - (Optional, object) The {es} query domain-specific language - (<>). This value corresponds to the query object in an {es} - search POST body. All the options that are supported by {es} can be used, - as this object is passed verbatim to {es}. By default, this property has - the following value: `{"match_all": {}}`. + `index`::: + (Required, string or array) Index or indices on which to perform the + analysis. It can be a single index or index pattern as well as an array of + indices or patterns. + + `query`::: + (Optional, object) The {es} query domain-specific language + (<>). This value corresponds to the query object in an {es} + search POST body. All the options that are supported by {es} can be used, + as this object is passed verbatim to {es}. By default, this property has + the following value: `{"match_all": {}}`. + + `_source`::: + (Optional, object) Specify `includes` and/or `excludes` patterns to select + which fields will be present in the destination. Fields that are excluded + cannot be included in the analysis. + + `includes`:::: + (array) An array of strings that defines the fields that will be included in + the destination. + + `excludes`:::: + (array) An array of strings that defines the fields that will be excluded + from the destination. `allow_lazy_start`:: (Optional, boolean) Whether this job should be allowed to start when there diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsAction.java index 5bce41d8a4ae6..4f4ddc388aed7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsAction.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; @@ -18,6 +19,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import java.io.IOException; @@ -87,6 +89,24 @@ public DataFrameAnalyticsConfig getConfig() { @Override public ActionRequestValidationException validate() { + ActionRequestValidationException error = null; + error = checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(config, error); + return error; + } + + private ActionRequestValidationException checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering( + DataFrameAnalyticsConfig config, ActionRequestValidationException error) { + if (config.getAnalyzedFields() == null) { + return null; + } + for (String analyzedInclude : config.getAnalyzedFields().includes()) { + if (config.getSource().isFieldExcluded(analyzedInclude)) { + return ValidateActions.addValidationError("field [" + analyzedInclude + "] is included in [" + + DataFrameAnalyticsConfig.ANALYZED_FIELDS.getPreferredName() + "] but not in [" + + DataFrameAnalyticsConfig.SOURCE.getPreferredName() + "." + + DataFrameAnalyticsSource._SOURCE.getPreferredName() + "]", error); + } + } return null; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java index ac1589fa56fbc..9fd7f8aa86fcb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java @@ -127,7 +127,7 @@ private static DataFrameAnalysis parseAnalysis(XContentParser parser, boolean ig private final Version version; private final boolean allowLazyStart; - public DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest, + private DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest, DataFrameAnalysis analysis, Map headers, ByteSizeValue modelMemoryLimit, FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart) { this.id = ExceptionsHelper.requireNonNull(id, ID); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java index 5ffa3119413ab..c5e5515deb0b5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSource.java @@ -6,17 +6,21 @@ package org.elasticsearch.xpack.core.ml.dataframe; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.QueryProvider; @@ -33,20 +37,29 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject { public static final ParseField INDEX = new ParseField("index"); public static final ParseField QUERY = new ParseField("query"); + public static final ParseField _SOURCE = new ParseField("_source"); public static ConstructingObjectParser createParser(boolean ignoreUnknownFields) { ConstructingObjectParser parser = new ConstructingObjectParser<>("data_frame_analytics_source", - ignoreUnknownFields, a -> new DataFrameAnalyticsSource(((List) a[0]).toArray(new String[0]), (QueryProvider) a[1])); + ignoreUnknownFields, a -> new DataFrameAnalyticsSource( + ((List) a[0]).toArray(new String[0]), + (QueryProvider) a[1], + (FetchSourceContext) a[2])); parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX); parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, Messages.DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT), QUERY); + parser.declareField(ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> FetchSourceContext.fromXContent(p), + _SOURCE, + ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING); return parser; } private final String[] index; private final QueryProvider queryProvider; + private final FetchSourceContext sourceFiltering; - public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider) { + public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider, @Nullable FetchSourceContext sourceFiltering) { this.index = ExceptionsHelper.requireNonNull(index, INDEX); if (index.length == 0) { throw new IllegalArgumentException("source.index must specify at least one index"); @@ -55,22 +68,36 @@ public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryPro throw new IllegalArgumentException("source.index must contain non-null and non-empty strings"); } this.queryProvider = queryProvider == null ? QueryProvider.defaultQuery() : queryProvider; + if (sourceFiltering != null && sourceFiltering.fetchSource() == false) { + throw new IllegalArgumentException("source._source cannot be disabled"); + } + this.sourceFiltering = sourceFiltering; } public DataFrameAnalyticsSource(StreamInput in) throws IOException { index = in.readStringArray(); queryProvider = QueryProvider.fromStream(in); + if (in.getVersion().onOrAfter(Version.CURRENT)) { + sourceFiltering = in.readOptionalWriteable(FetchSourceContext::new); + } else { + sourceFiltering = null; + } } public DataFrameAnalyticsSource(DataFrameAnalyticsSource other) { this.index = Arrays.copyOf(other.index, other.index.length); this.queryProvider = new QueryProvider(other.queryProvider); + this.sourceFiltering = other.sourceFiltering == null ? null : new FetchSourceContext( + other.sourceFiltering.fetchSource(), other.sourceFiltering.includes(), other.sourceFiltering.excludes()); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(index); queryProvider.writeTo(out); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalWriteable(sourceFiltering); + } } @Override @@ -78,6 +105,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.array(INDEX.getPreferredName(), index); builder.field(QUERY.getPreferredName(), queryProvider.getQuery()); + if (sourceFiltering != null) { + builder.field(_SOURCE.getPreferredName(), sourceFiltering); + } builder.endObject(); return builder; } @@ -89,12 +119,13 @@ public boolean equals(Object o) { DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o; return Arrays.equals(index, other.index) - && Objects.equals(queryProvider, other.queryProvider); + && Objects.equals(queryProvider, other.queryProvider) + && Objects.equals(sourceFiltering, other.sourceFiltering); } @Override public int hashCode() { - return Objects.hash(Arrays.asList(index), queryProvider); + return Objects.hash(Arrays.asList(index), queryProvider, sourceFiltering); } public String[] getIndex() { @@ -118,6 +149,10 @@ public QueryBuilder getParsedQuery() { return queryProvider.getParsedQuery(); } + public FetchSourceContext getSourceFiltering() { + return sourceFiltering; + } + Exception getQueryParsingException() { return queryProvider.getParsingException(); } @@ -147,4 +182,47 @@ public List getQueryDeprecations(NamedXContentRegistry namedXContentRegi Map getQuery() { return queryProvider.getQuery(); } + + public boolean isFieldExcluded(String path) { + if (sourceFiltering == null) { + return false; + } + + // First we check in the excludes as they are applied last + for (String exclude : sourceFiltering.excludes()) { + if (pathMatchesSourcePattern(path, exclude)) { + return true; + } + } + + // Now we can check the includes + + // Empty includes means no further exclusions + if (sourceFiltering.includes().length == 0) { + return false; + } + + for (String include : sourceFiltering.includes()) { + if (pathMatchesSourcePattern(path, include)) { + return false; + } + } + return true; + } + + private static boolean pathMatchesSourcePattern(String path, String sourcePattern) { + if (sourcePattern.equals(path)) { + return true; + } + + if (Regex.isSimpleMatchPattern(sourcePattern)) { + return Regex.simpleMatch(sourcePattern, path); + } + + // At this stage sourcePattern is a concrete field name and path is not equal to it. + // We should check if path is a nested field of pattern. + // Let us take "foo" as an example. + // Fields that are "foo.*" should also be matched. + return Regex.simpleMatch(sourcePattern + ".*", path); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 95d8194397a54..b9de87ef93de0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -418,6 +418,9 @@ public static void addDataFrameAnalyticsFields(XContentBuilder builder) throws I .startObject(DataFrameAnalyticsSource.QUERY.getPreferredName()) .field(ENABLED, false) .endObject() + .startObject(DataFrameAnalyticsSource._SOURCE.getPreferredName()) + .field(ENABLED, false) + .endObject() .endObject() .endObject() .startObject(DataFrameAnalyticsConfig.DEST.getPreferredName()) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index e0ae183d17a0f..8eacdcb0e78e4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -303,6 +303,7 @@ public final class ReservedFieldNames { DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(), DataFrameAnalyticsSource.INDEX.getPreferredName(), DataFrameAnalyticsSource.QUERY.getPreferredName(), + DataFrameAnalyticsSource._SOURCE.getPreferredName(), OutlierDetection.NAME.getPreferredName(), OutlierDetection.N_NEIGHBORS.getPreferredName(), OutlierDetection.METHOD.getPreferredName(), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java index dbd3db927503c..9d194bc260523 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PutDataFrameAnalyticsActionRequestTests.java @@ -11,16 +11,25 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction.Request; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetectionTests; import org.junit.Before; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + public class PutDataFrameAnalyticsActionRequestTests extends AbstractSerializingTestCase { private String id; @@ -65,4 +74,39 @@ protected boolean supportsUnknownFields() { protected Request doParseInstance(XContentParser parser) { return Request.parseRequest(id, parser); } + + public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsExcludedInSourceFiltering() { + DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null, + new FetchSourceContext(true, null, new String[] {"excluded"})); + FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"excluded"}, null); + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId("foo") + .setSource(source) + .setAnalysis(OutlierDetectionTests.createRandom()) + .setAnalyzedFields(analyzedFields) + .buildForExplain(); + Request request = new Request(config); + + Exception e = request.validate(); + + assertThat(e, is(notNullValue())); + assertThat(e.getMessage(), containsString("field [excluded] is included in [analyzed_fields] but not in [source._source]")); + } + + public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsIncludedInSourceFiltering() { + DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null, + new FetchSourceContext(true, new String[] {"included"}, null)); + FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"included"}, null); + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId("foo") + .setSource(source) + .setAnalysis(OutlierDetectionTests.createRandom()) + .setAnalyzedFields(analyzedFields) + .buildForExplain(); + Request request = new Request(config); + + Exception e = request.validate(); + + assertThat(e, is(nullValue())); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java index 36c4774baa465..cd58f9c84533c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsSourceTests.java @@ -12,12 +12,18 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.ml.utils.QueryProvider; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; public class DataFrameAnalyticsSourceTests extends AbstractSerializingTestCase { @@ -46,6 +52,7 @@ protected DataFrameAnalyticsSource createTestInstance() { public static DataFrameAnalyticsSource createRandom() { String[] index = generateRandomStringArray(10, 10, false, false); QueryProvider queryProvider = null; + FetchSourceContext sourceFiltering = null; if (randomBoolean()) { try { queryProvider = QueryProvider.fromParsedQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10))); @@ -54,11 +61,75 @@ public static DataFrameAnalyticsSource createRandom() { throw new UncheckedIOException(e); } } - return new DataFrameAnalyticsSource(index, queryProvider); + if (randomBoolean()) { + sourceFiltering = new FetchSourceContext(true, + generateRandomStringArray(10, 10, false, false), + generateRandomStringArray(10, 10, false, false)); + } + return new DataFrameAnalyticsSource(index, queryProvider, sourceFiltering); } @Override protected Writeable.Reader instanceReader() { return DataFrameAnalyticsSource::new; } + + public void testConstructor_GivenDisabledSource() { + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new DataFrameAnalyticsSource( + new String[] {"index"}, null, new FetchSourceContext(false, null, null))); + assertThat(e.getMessage(), equalTo("source._source cannot be disabled")); + } + + public void testIsFieldExcluded_GivenNoSourceFiltering() { + DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, null); + assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false)); + } + + public void testIsFieldExcluded_GivenSourceFilteringWithNulls() { + DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, + new FetchSourceContext(true, null, null)); + assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false)); + } + + public void testIsFieldExcluded_GivenExcludes() { + assertThat(newSourceWithExcludes("foo").isFieldExcluded("bar"), is(false)); + assertThat(newSourceWithExcludes("foo").isFieldExcluded("foo"), is(true)); + assertThat(newSourceWithExcludes("foo").isFieldExcluded("foo.bar"), is(true)); + assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foo"), is(true)); + assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foobar"), is(true)); + assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foo.bar"), is(true)); + assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foo*"), is(true)); + assertThat(newSourceWithExcludes("foo*").isFieldExcluded("fo*"), is(false)); + } + + public void testIsFieldExcluded_GivenIncludes() { + assertThat(newSourceWithIncludes("foo").isFieldExcluded("bar"), is(true)); + assertThat(newSourceWithIncludes("foo").isFieldExcluded("foo"), is(false)); + assertThat(newSourceWithIncludes("foo").isFieldExcluded("foo.bar"), is(false)); + assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foo"), is(false)); + assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foobar"), is(false)); + assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foo.bar"), is(false)); + assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foo*"), is(false)); + assertThat(newSourceWithIncludes("foo*").isFieldExcluded("fo*"), is(true)); + } + + public void testIsFieldExcluded_GivenIncludesAndExcludes() { + // Excludes take precedence + assertThat(newSourceWithIncludesExcludes(Collections.singletonList("foo"), Collections.singletonList("foo")) + .isFieldExcluded("foo"), is(true)); + } + + private static DataFrameAnalyticsSource newSourceWithIncludes(String... includes) { + return newSourceWithIncludesExcludes(Arrays.asList(includes), Collections.emptyList()); + } + + private static DataFrameAnalyticsSource newSourceWithExcludes(String... excludes) { + return newSourceWithIncludesExcludes(Collections.emptyList(), Arrays.asList(excludes)); + } + + private static DataFrameAnalyticsSource newSourceWithIncludesExcludes(List includes, List excludes) { + FetchSourceContext sourceFiltering = new FetchSourceContext(true, + includes.toArray(new String[0]), excludes.toArray(new String[0])); + return new DataFrameAnalyticsSource(new String[] { "index" } , null, sourceFiltering); + } } diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index 38beb1d1908c1..fe53f1ca39ff5 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -52,6 +52,7 @@ integTest.runner { 'ml/data_frame_analytics_crud/Test put config with dest index included in source via alias', 'ml/data_frame_analytics_crud/Test put config with unknown top level field', 'ml/data_frame_analytics_crud/Test put config with unknown field in outlier detection analysis', + 'ml/data_frame_analytics_crud/Test put config given analyzed_fields include field excluded by source', 'ml/data_frame_analytics_crud/Test put config given missing source', 'ml/data_frame_analytics_crud/Test put config given source with empty index array', 'ml/data_frame_analytics_crud/Test put config given source with empty string in index array', diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java index 6796e3b7223d7..540d9f373b7e4 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java @@ -53,7 +53,8 @@ public void testSourceQueryIsApplied() throws IOException { DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(id) .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, - QueryProvider.fromParsedQuery(QueryBuilders.termQuery("categorical", "only-one")))) + QueryProvider.fromParsedQuery(QueryBuilders.termQuery("categorical", "only-one")), + null)) .setAnalysis(new Classification("categorical")) .buildForExplain(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index 06c88a9793b23..b3b58a2d4fcbb 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -164,7 +164,7 @@ protected static DataFrameAnalyticsConfig buildAnalytics(String id, String sourc @Nullable String resultsField, DataFrameAnalysis analysis) { return new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null)) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null)) .setDest(new DataFrameAnalyticsDest(destIndex, resultsField)) .setAnalysis(analysis) .build(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index c30fccce2f17a..2628a751bc112 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -356,7 +356,7 @@ public void testOutlierDetectionWithMultipleSourceIndices() throws Exception { String id = "test_outlier_detection_with_multiple_source_indices"; DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(sourceIndex, null)) + .setSource(new DataFrameAnalyticsSource(sourceIndex, null, null)) .setDest(new DataFrameAnalyticsDest(destIndex, null)) .setAnalysis(new OutlierDetection.Builder().build()) .build(); @@ -472,7 +472,7 @@ public void testModelMemoryLimitLowerThanEstimatedMemoryUsage() throws Exception ByteSizeValue modelMemoryLimit = new ByteSizeValue(1, ByteSizeUnit.MB); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null)) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null)) .setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null)) .setAnalysis(new OutlierDetection.Builder().build()) .setModelMemoryLimit(modelMemoryLimit) @@ -516,7 +516,7 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws ByteSizeValue modelMemoryLimit = new ByteSizeValue(1, ByteSizeUnit.TB); DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() .setId(id) - .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null)) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null)) .setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null)) .setAnalysis(new OutlierDetection.Builder().build()) .setModelMemoryLimit(modelMemoryLimit) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index f76014eda1910..65ed0b93aa7b9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -237,7 +237,7 @@ private void getStartContext(String id, ActionListener finalListen // Step 5. Validate mappings can be merged ActionListener toValidateMappingsListener = ActionListener.wrap( startContext -> MappingsMerger.mergeMappings(client, startContext.config.getHeaders(), - startContext.config.getSource().getIndex(), ActionListener.wrap( + startContext.config.getSource(), ActionListener.wrap( mappings -> validateMappingsMergeListener.onResponse(startContext), finalListener::onFailure)), finalListener::onFailure ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndex.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndex.java index 444b0081cc7bc..a369bc7d0b09a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndex.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndex.java @@ -84,8 +84,6 @@ private static void prepareCreateIndexRequest(Client client, Clock clock, DataFr ActionListener listener) { AtomicReference settingsHolder = new AtomicReference<>(); - String[] sourceIndex = config.getSource().getIndex(); - ActionListener> mappingsListener = ActionListener.wrap( mappings -> listener.onResponse(createIndexRequest(clock, config, settingsHolder.get(), mappings)), listener::onFailure @@ -94,7 +92,7 @@ private static void prepareCreateIndexRequest(Client client, Clock clock, DataFr ActionListener settingsListener = ActionListener.wrap( settings -> { settingsHolder.set(settings); - MappingsMerger.mergeMappings(client, config.getHeaders(), sourceIndex, mappingsListener); + MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource(), mappingsListener); }, listener::onFailure ); @@ -105,7 +103,7 @@ private static void prepareCreateIndexRequest(Client client, Clock clock, DataFr ); GetSettingsRequest getSettingsRequest = new GetSettingsRequest(); - getSettingsRequest.indices(sourceIndex); + getSettingsRequest.indices(config.getSource().getIndex()); getSettingsRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); getSettingsRequest.names(PRESERVED_SETTINGS); ClientHelper.executeWithHeadersAsync(config.getHeaders(), ML_ORIGIN, client, GetSettingsAction.INSTANCE, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index c53238dc425d7..76fc588027943 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -179,6 +179,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF ReindexRequest reindexRequest = new ReindexRequest(); reindexRequest.setSourceIndices(config.getSource().getIndex()); reindexRequest.setSourceQuery(config.getSource().getParsedQuery()); + reindexRequest.getSearchRequest().source().fetchSource(config.getSource().getSourceFiltering()); reindexRequest.setDestIndex(config.getDest().getIndex()); reindexRequest.setScript(new Script("ctx._source." + DataFrameAnalyticsIndex.ID_COPY + " = ctx._id")); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/MappingsMerger.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/MappingsMerger.java index c573f193cf01a..056f8239aef8d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/MappingsMerger.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/MappingsMerger.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.util.Collections; @@ -32,22 +33,22 @@ public final class MappingsMerger { private MappingsMerger() {} - public static void mergeMappings(Client client, Map headers, String[] index, + public static void mergeMappings(Client client, Map headers, DataFrameAnalyticsSource source, ActionListener> listener) { ActionListener mappingsListener = ActionListener.wrap( - getMappingsResponse -> listener.onResponse(MappingsMerger.mergeMappings(getMappingsResponse)), + getMappingsResponse -> listener.onResponse(MappingsMerger.mergeMappings(source, getMappingsResponse)), listener::onFailure ); GetMappingsRequest getMappingsRequest = new GetMappingsRequest(); - getMappingsRequest.indices(index); + getMappingsRequest.indices(source.getIndex()); ClientHelper.executeWithHeadersAsync(headers, ML_ORIGIN, client, GetMappingsAction.INSTANCE, getMappingsRequest, mappingsListener); } - static ImmutableOpenMap mergeMappings(GetMappingsResponse getMappingsResponse) { + static ImmutableOpenMap mergeMappings(DataFrameAnalyticsSource source, + GetMappingsResponse getMappingsResponse) { ImmutableOpenMap indexToMappings = getMappingsResponse.getMappings(); - String type = null; Map mergedMappings = new HashMap<>(); Iterator> iterator = indexToMappings.iterator(); @@ -61,13 +62,16 @@ static ImmutableOpenMap mergeMappings(GetMappingsRespon Map fieldMappings = (Map) currentMappings.get("properties"); for (Map.Entry fieldMapping : fieldMappings.entrySet()) { - if (mergedMappings.containsKey(fieldMapping.getKey())) { - if (mergedMappings.get(fieldMapping.getKey()).equals(fieldMapping.getValue()) == false) { - throw ExceptionsHelper.badRequestException("cannot merge mappings because of differences for field [{}]", - fieldMapping.getKey()); + String field = fieldMapping.getKey(); + if (source.isFieldExcluded(field) == false) { + if (mergedMappings.containsKey(field)) { + if (mergedMappings.get(field).equals(fieldMapping.getValue()) == false) { + throw ExceptionsHelper.badRequestException( + "cannot merge mappings because of differences for field [{}]", field); + } + } else { + mergedMappings.put(field, fieldMapping.getValue()); } - } else { - mergedMappings.put(fieldMapping.getKey(), fieldMapping.getValue()); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java index 42ea2d1944301..62184e290374d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java @@ -85,6 +85,7 @@ private Set getIncludedFields(Set fieldSelection) { fields.removeAll(IGNORE_FIELDS); checkResultsFieldIsNotPresent(); removeFieldsUnderResultsField(fields); + applySourceFiltering(fields); FetchSourceContext analyzedFields = config.getAnalyzedFields(); // If the user has not explicitly included fields we'll include all compatible fields @@ -132,6 +133,16 @@ private void checkResultsFieldIsNotPresent() { } } + private void applySourceFiltering(Set fields) { + Iterator fieldsIterator = fields.iterator(); + while (fieldsIterator.hasNext()) { + String field = fieldsIterator.next(); + if (config.getSource().isFieldExcluded(field)) { + fieldsIterator.remove(); + } + } + } + private void addExcludedField(String field, String reason, Set fieldSelection) { fieldSelection.add(FieldSelection.excluded(field, getMappingTypes(field), reason)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java index 063e1ea337782..950d5997a5a35 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsIndexTests.java @@ -58,7 +58,7 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase { private static final DataFrameAnalyticsConfig ANALYTICS_CONFIG = new DataFrameAnalyticsConfig.Builder() .setId(ANALYTICS_ID) - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, null)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, null)) .setAnalysis(new OutlierDetection.Builder().build()) .build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java index f44e8a9f3e61a..e47b3e6934e80 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/MappingsMergerTests.java @@ -10,9 +10,10 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; -import java.io.IOException; import java.util.Map; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -21,7 +22,7 @@ public class MappingsMergerTests extends ESTestCase { - public void testMergeMappings_GivenIndicesWithIdenticalMappings() throws IOException { + public void testMergeMappings_GivenIndicesWithIdenticalMappings() { Map index1Mappings = Map.of("properties", Map.of("field_1", "field_1_mappings", "field_2", "field_2_mappings")); MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings); @@ -34,14 +35,14 @@ public void testMergeMappings_GivenIndicesWithIdenticalMappings() throws IOExcep GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build()); - ImmutableOpenMap mergedMappings = MappingsMerger.mergeMappings(getMappingsResponse); + ImmutableOpenMap mergedMappings = MappingsMerger.mergeMappings(newSource(), getMappingsResponse); assertThat(mergedMappings.size(), equalTo(1)); assertThat(mergedMappings.containsKey("_doc"), is(true)); assertThat(mergedMappings.valuesIt().next().getSourceAsMap(), equalTo(index1Mappings)); } - public void testMergeMappings_GivenFieldWithDifferentMapping() throws IOException { + public void testMergeMappings_GivenFieldWithDifferentMapping() { Map index1Mappings = Map.of("properties", Map.of("field_1", "field_1_mappings")); MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings); @@ -55,12 +56,12 @@ public void testMergeMappings_GivenFieldWithDifferentMapping() throws IOExceptio GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> MappingsMerger.mergeMappings(getMappingsResponse)); + () -> MappingsMerger.mergeMappings(newSource(), getMappingsResponse)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.getMessage(), equalTo("cannot merge mappings because of differences for field [field_1]")); } - public void testMergeMappings_GivenIndicesWithDifferentMappingsButNoConflicts() throws IOException { + public void testMergeMappings_GivenIndicesWithDifferentMappingsButNoConflicts() { Map index1Mappings = Map.of("properties", Map.of("field_1", "field_1_mappings", "field_2", "field_2_mappings")); MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings); @@ -75,7 +76,7 @@ public void testMergeMappings_GivenIndicesWithDifferentMappingsButNoConflicts() GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build()); - ImmutableOpenMap mergedMappings = MappingsMerger.mergeMappings(getMappingsResponse); + ImmutableOpenMap mergedMappings = MappingsMerger.mergeMappings(newSource(), getMappingsResponse); assertThat(mergedMappings.size(), equalTo(1)); assertThat(mergedMappings.containsKey("_doc"), is(true)); @@ -92,4 +93,35 @@ public void testMergeMappings_GivenIndicesWithDifferentMappingsButNoConflicts() assertThat(fieldMappings.get("field_2"), equalTo("field_2_mappings")); assertThat(fieldMappings.get("field_3"), equalTo("field_3_mappings")); } + + public void testMergeMappings_GivenSourceFiltering() { + Map indexMappings = Map.of("properties", Map.of("field_1", "field_1_mappings", "field_2", "field_2_mappings")); + MappingMetaData indexMappingMetaData = new MappingMetaData("_doc", indexMappings); + + ImmutableOpenMap.Builder mappings = ImmutableOpenMap.builder(); + mappings.put("index", indexMappingMetaData); + + GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build()); + + ImmutableOpenMap mergedMappings = MappingsMerger.mergeMappings( + newSourceWithExcludes("field_1"), getMappingsResponse); + + assertThat(mergedMappings.size(), equalTo(1)); + assertThat(mergedMappings.containsKey("_doc"), is(true)); + Map mappingsAsMap = mergedMappings.valuesIt().next().getSourceAsMap(); + @SuppressWarnings("unchecked") + Map fieldMappings = (Map) mappingsAsMap.get("properties"); + + assertThat(fieldMappings.size(), equalTo(1)); + assertThat(fieldMappings.containsKey("field_2"), is(true)); + } + + private static DataFrameAnalyticsSource newSource() { + return new DataFrameAnalyticsSource(new String[] {"index"}, null, null); + } + + private static DataFrameAnalyticsSource newSourceWithExcludes(String... excludes) { + return new DataFrameAnalyticsSource(new String[] {"index"}, null, + new FetchSourceContext(true, null, excludes)); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java index c9423aadbe03a..ec8b97942ac0a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/SourceDestValidatorTests.java @@ -183,6 +183,6 @@ public void testCheck_GivenDestIndexIsAliasThatIsIncludedInSource() { } private static DataFrameAnalyticsSource createSource(String... index) { - return new DataFrameAnalyticsSource(index, null); + return new DataFrameAnalyticsSource(index, null, null); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java index 25553627a9e05..f4f25bcfa0636 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java @@ -45,6 +45,9 @@ public class ExtractedFieldsDetectorTests extends ESTestCase { private static final String DEST_INDEX = "dest_index"; private static final String RESULTS_FIELD = "ml"; + private FetchSourceContext sourceFiltering; + private FetchSourceContext analyzedFields; + public void testDetect_GivenFloatField() { FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() .addAggregatableField("some_float", "float").build(); @@ -86,8 +89,8 @@ public void testDetect_GivenOutlierDetectionAndNonNumericField() { .addAggregatableField("some_keyword", "keyword").build(); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]." + " Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short].")); @@ -99,7 +102,7 @@ public void testDetect_GivenOutlierDetectionAndFieldWithNumericAndNonNumericType ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " + "Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short].")); @@ -171,7 +174,7 @@ public void testDetect_GivenRegressionAndRequiredFieldMissing() { ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]")); } @@ -183,11 +186,11 @@ public void testDetect_GivenRegressionAndRequiredFieldExcluded() { .addAggregatableField("some_keyword", "keyword") .addAggregatableField("foo", "float") .build(); - FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[0], new String[] {"foo"}); + analyzedFields = new FetchSourceContext(true, new String[0], new String[] {"foo"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]")); } @@ -199,11 +202,11 @@ public void testDetect_GivenRegressionAndRequiredFieldNotIncluded() { .addAggregatableField("some_keyword", "keyword") .addAggregatableField("foo", "float") .build(); - FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"some_float", "some_keyword"}, new String[0]); + analyzedFields = new FetchSourceContext(true, new String[] {"some_float", "some_keyword"}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]")); } @@ -213,10 +216,10 @@ public void testDetect_GivenFieldIsBothIncludedAndExcluded() { .addAggregatableField("foo", "float") .addAggregatableField("bar", "float") .build(); - FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"foo", "bar"}, new String[] {"foo"}); + analyzedFields = new FetchSourceContext(true, new String[] {"foo", "bar"}, new String[] {"foo"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); Tuple> fieldExtraction = extractedFieldsDetector.detect(); List allFields = fieldExtraction.v1().getAllFields(); @@ -239,7 +242,7 @@ public void testDetect_GivenRegressionAndRequiredFieldHasInvalidType() { ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("invalid types [keyword] for required field [foo]; " + "expected types are [byte, double, float, half_float, integer, long, scaled_float, short]")); @@ -255,7 +258,7 @@ public void testDetect_GivenClassificationAndRequiredFieldHasInvalidType() { ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( SOURCE_INDEX, buildClassificationConfig("some_float"), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("invalid types [float] for required field [some_float]; " + "expected types are [boolean, byte, integer, ip, keyword, long, short, text]")); @@ -270,7 +273,7 @@ public void testDetect_GivenClassificationAndDependentVariableHasInvalidCardinal ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(SOURCE_INDEX, buildClassificationConfig("some_keyword"), false, 100, fieldCapabilities, Collections.singletonMap("some_keyword", 3L)); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("Field [some_keyword] must have at most [2] distinct values but there were at least [3]")); } @@ -281,7 +284,7 @@ public void testDetect_GivenIgnoredField() { ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " + "Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short].")); @@ -291,11 +294,11 @@ public void testDetect_GivenIncludedIgnoredField() { FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() .addAggregatableField("_id", "float") .build(); - FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{"_id"}, new String[0]); + analyzedFields = new FetchSourceContext(true, new String[]{"_id"}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("No field [_id] could be detected")); } @@ -304,11 +307,11 @@ public void testDetect_GivenExcludedFieldIsMissing() { FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() .addAggregatableField("foo", "float") .build(); - FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{"*"}, new String[] {"bar"}); + analyzedFields = new FetchSourceContext(true, new String[]{"*"}, new String[] {"bar"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("No field [bar] could be detected")); } @@ -318,10 +321,10 @@ public void testDetect_GivenExcludedFieldIsUnsupported() { .addAggregatableField("numeric", "float") .addAggregatableField("categorical", "keyword") .build(); - FetchSourceContext analyzedFields = new FetchSourceContext(true, null, new String[] {"categorical"}); + analyzedFields = new FetchSourceContext(true, null, new String[] {"categorical"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); Tuple> fieldExtraction = extractedFieldsDetector.detect(); @@ -366,11 +369,11 @@ public void testDetect_GivenIncludeWithMissingField() { .addAggregatableField("my_field2", "float") .build(); - FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]); + analyzedFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("No field [your_field1] could be detected")); } @@ -381,11 +384,11 @@ public void testDetect_GivenExcludeAllValidFields() { .addAggregatableField("my_field2", "float") .build(); - FetchSourceContext desiredFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"}); + analyzedFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " + "Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short].")); } @@ -397,10 +400,10 @@ public void testDetect_GivenInclusionsAndExclusions() { .addAggregatableField("your_field2", "float") .build(); - FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"}); + analyzedFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap()); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); Tuple> fieldExtraction = extractedFieldsDetector.detect(); List extractedFieldNames = fieldExtraction.v1().getAllFields().stream().map(ExtractedField::getName) @@ -422,11 +425,11 @@ public void testDetect_GivenIncludedFieldHasUnsupportedType() { .addAggregatableField("your_keyword", "keyword") .build(); - FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"}); + analyzedFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"}); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("field [your_keyword] has unsupported type [keyword]. " + "Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short].")); @@ -442,7 +445,7 @@ public void testDetect_GivenIndexContainsResultsField() { ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " + "please set a different results_field")); @@ -479,11 +482,11 @@ public void testDetect_GivenIncludedResultsField() { .addAggregatableField("your_field2", "float") .addAggregatableField("your_keyword", "keyword") .build(); - FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]); + analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " + "please set a different results_field")); @@ -496,11 +499,11 @@ public void testDetect_GivenIncludedResultsFieldAndTaskIsRestarting() { .addAggregatableField("your_field2", "float") .addAggregatableField("your_keyword", "keyword") .build(); - FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]); + analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), true, 100, fieldCapabilities, Collections.emptyMap()); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect()); + SOURCE_INDEX, buildOutlierDetectionConfig(), true, 100, fieldCapabilities, Collections.emptyMap()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect); assertThat(e.getMessage(), equalTo("No field [ml] could be detected")); } @@ -814,10 +817,10 @@ public void testDetect_GivenMultiFields_AndExplicitlyIncludedFields() { .addAggregatableField("field_1.keyword", "keyword") .addAggregatableField("field_2", "float") .build(); - FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] { "field_1", "field_2" }, new String[0]); + analyzedFields = new FetchSourceContext(true, new String[] { "field_1", "field_2" }, new String[0]); ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( - SOURCE_INDEX, buildRegressionConfig("field_2", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap()); + SOURCE_INDEX, buildRegressionConfig("field_2"), false, 100, fieldCapabilities, Collections.emptyMap()); Tuple> fieldExtraction = extractedFieldsDetector.detect(); assertThat(fieldExtraction.v1().getAllFields().size(), equalTo(2)); @@ -832,38 +835,76 @@ public void testDetect_GivenMultiFields_AndExplicitlyIncludedFields() { ); } - private static DataFrameAnalyticsConfig buildOutlierDetectionConfig() { - return buildOutlierDetectionConfig(null); + public void testDetect_GivenSourceFilteringWithIncludes() { + FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() + .addAggregatableField("field_11", "float") + .addAggregatableField("field_12", "float") + .addAggregatableField("field_21", "float") + .addAggregatableField("field_22", "float").build(); + + sourceFiltering = new FetchSourceContext(true, new String[] {"field_1*"}, null); + + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); + Tuple> fieldExtraction = extractedFieldsDetector.detect(); + + List allFields = fieldExtraction.v1().getAllFields(); + assertThat(allFields.size(), equalTo(2)); + assertThat(allFields.get(0).getName(), equalTo("field_11")); + assertThat(allFields.get(1).getName(), equalTo("field_12")); + + assertFieldSelectionContains(fieldExtraction.v2(), + FieldSelection.included("field_11", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL), + FieldSelection.included("field_12", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL)); + } + + public void testDetect_GivenSourceFilteringWithExcludes() { + FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder() + .addAggregatableField("field_11", "float") + .addAggregatableField("field_12", "float") + .addAggregatableField("field_21", "float") + .addAggregatableField("field_22", "float").build(); + + sourceFiltering = new FetchSourceContext(true, null, new String[] {"field_1*"}); + + ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector( + SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap()); + Tuple> fieldExtraction = extractedFieldsDetector.detect(); + + List allFields = fieldExtraction.v1().getAllFields(); + assertThat(allFields.size(), equalTo(2)); + assertThat(allFields.get(0).getName(), equalTo("field_21")); + assertThat(allFields.get(1).getName(), equalTo("field_22")); + + assertFieldSelectionContains(fieldExtraction.v2(), + FieldSelection.included("field_21", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL), + FieldSelection.included("field_22", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL)); } - private static DataFrameAnalyticsConfig buildOutlierDetectionConfig(FetchSourceContext analyzedFields) { + private DataFrameAnalyticsConfig buildOutlierDetectionConfig() { return new DataFrameAnalyticsConfig.Builder() .setId("foo") - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD)) .setAnalyzedFields(analyzedFields) .setAnalysis(new OutlierDetection.Builder().build()) .build(); } - private static DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable) { - return buildRegressionConfig(dependentVariable, null); - } - - private static DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable, FetchSourceContext analyzedFields) { + private DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable) { return new DataFrameAnalyticsConfig.Builder() .setId("foo") - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD)) .setAnalyzedFields(analyzedFields) .setAnalysis(new Regression(dependentVariable)) .build(); } - private static DataFrameAnalyticsConfig buildClassificationConfig(String dependentVariable) { + private DataFrameAnalyticsConfig buildClassificationConfig(String dependentVariable) { return new DataFrameAnalyticsConfig.Builder() .setId("foo") - .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null)) + .setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering)) .setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD)) .setAnalysis(new Classification(dependentVariable)) .build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java index 0d2b5aea364eb..b1a2ba226b49f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java @@ -71,7 +71,7 @@ public void setUpMocks() { analyticsConfig = new DataFrameAnalyticsConfig.Builder() .setId(JOB_ID) .setDescription(JOB_DESCRIPTION) - .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null)) + .setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null)) .setDest(new DataFrameAnalyticsDest("my_dest", null)) .setAnalysis(new Regression("foo")) .build(); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml index 6e1828efcd4ba..a1d78b7444057 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml @@ -41,7 +41,8 @@ setup: { "source": { "index": "index-source", - "query": {"term" : { "user" : "Kimchy" }} + "query": {"term" : { "user" : "Kimchy" }}, + "_source": [ "obj1.*", "obj2.*" ] }, "dest": { "index": "index-dest" @@ -1852,3 +1853,28 @@ setup: }} - is_true: create_time - is_true: version + +--- +"Test put config given analyzed_fields include field excluded by source": + + - do: + catch: /field \[excluded\] is included in \[analyzed_fields\] but not in \[source._source\]/ + ml.put_data_frame_analytics: + id: "analyzed_fields-include-field-excluded-by-source" + body: > + { + "source": { + "index": "index-source", + "query": {"term" : { "user" : "Kimchy" }}, + "_source": { + "excludes": ["excluded"] + } + }, + "dest": { + "index": "index-dest" + }, + "analysis": {"outlier_detection":{}}, + "analyzed_fields": { + "includes": ["excluded"] + } + }