diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java index f56da88303d88..f192b420eba0e 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java @@ -62,6 +62,7 @@ public class DatafeedConfig implements ToXContentObject { public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields"); public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config"); public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); + public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "datafeed_config", true, a -> new Builder((String)a[0], (String)a[1])); @@ -88,6 +89,7 @@ public class DatafeedConfig implements ToXContentObject { PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE); PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG); PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG); + PARSER.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES); } private static BytesReference parseBytes(XContentParser parser) throws IOException { @@ -107,11 +109,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti private final Integer scrollSize; private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; - + private final Integer maxEmptySearches; private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, BytesReference query, BytesReference aggregations, List scriptFields, Integer scrollSize, - ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) { + ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, + Integer maxEmptySearches) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -123,6 +126,7 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue this.scrollSize = scrollSize; this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; + this.maxEmptySearches = maxEmptySearches; } public String getId() { @@ -169,6 +173,10 @@ public DelayedDataCheckConfig getDelayedDataCheckConfig() { return delayedDataCheckConfig; } + public Integer getMaxEmptySearches() { + return maxEmptySearches; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -205,6 +213,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (delayedDataCheckConfig != null) { builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig); } + if (maxEmptySearches != null) { + builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches); + } builder.endObject(); return builder; @@ -245,7 +256,8 @@ public boolean equals(Object other) { && Objects.equals(asMap(this.aggregations), asMap(that.aggregations)) && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) - && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig); + && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); } /** @@ -256,7 +268,7 @@ public boolean equals(Object other) { @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields, - chunkingConfig, delayedDataCheckConfig); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches); } public static Builder builder(String id, String jobId) { @@ -276,6 +288,7 @@ public static class Builder { private Integer scrollSize; private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; + private Integer maxEmptySearches; public Builder(String id, String jobId) { this.id = Objects.requireNonNull(id, ID.getPreferredName()); @@ -294,6 +307,7 @@ public Builder(DatafeedConfig config) { this.scrollSize = config.scrollSize; this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.getDelayedDataCheckConfig(); + this.maxEmptySearches = config.getMaxEmptySearches(); } public Builder setIndices(List indices) { @@ -376,9 +390,14 @@ public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheck return this; } + public Builder setMaxEmptySearches(int maxEmptySearches) { + this.maxEmptySearches = maxEmptySearches; + return this; + } + public DatafeedConfig build() { return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches); } private static BytesReference xContentToBytes(ToXContentObject object) throws IOException { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java index e1ac22d45fa13..a7d76e9a69c96 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java @@ -77,6 +77,7 @@ public class DatafeedUpdate implements ToXContentObject { PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG); + PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES); } private static BytesReference parseBytes(XContentParser parser) throws IOException { @@ -95,10 +96,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti private final Integer scrollSize; private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; + private final Integer maxEmptySearches; private DatafeedUpdate(String id, TimeValue queryDelay, TimeValue frequency, List indices, BytesReference query, BytesReference aggregations, List scriptFields, Integer scrollSize, - ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) { + ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, + Integer maxEmptySearches) { this.id = id; this.queryDelay = queryDelay; this.frequency = frequency; @@ -109,6 +112,7 @@ private DatafeedUpdate(String id, TimeValue queryDelay, TimeValue frequency, Lis this.scrollSize = scrollSize; this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; + this.maxEmptySearches = maxEmptySearches; } /** @@ -147,6 +151,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize); addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); + addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches); builder.endObject(); return builder; } @@ -193,6 +198,10 @@ public DelayedDataCheckConfig getDelayedDataCheckConfig() { return delayedDataCheckConfig; } + public Integer getMaxEmptySearches() { + return maxEmptySearches; + } + private static Map asMap(BytesReference bytesReference) { return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2(); } @@ -227,7 +236,8 @@ public boolean equals(Object other) { && Objects.equals(asMap(this.aggregations), asMap(that.aggregations)) && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) && Objects.equals(this.scriptFields, that.scriptFields) - && Objects.equals(this.chunkingConfig, that.chunkingConfig); + && Objects.equals(this.chunkingConfig, that.chunkingConfig) + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); } /** @@ -238,7 +248,7 @@ public boolean equals(Object other) { @Override public int hashCode() { return Objects.hash(id, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields, - chunkingConfig, delayedDataCheckConfig); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches); } public static Builder builder(String id) { @@ -257,6 +267,7 @@ public static class Builder { private Integer scrollSize; private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; + private Integer maxEmptySearches; public Builder(String id) { this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName()); @@ -273,6 +284,7 @@ public Builder(DatafeedUpdate config) { this.scrollSize = config.scrollSize; this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.delayedDataCheckConfig; + this.maxEmptySearches = config.maxEmptySearches; } public Builder setIndices(List indices) { @@ -346,9 +358,14 @@ public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheck return this; } + public Builder setMaxEmptySearches(int maxEmptySearches) { + this.maxEmptySearches = maxEmptySearches; + return this; + } + public DatafeedUpdate build() { return new DatafeedUpdate(id, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches); } private static BytesReference xContentToBytes(ToXContentObject object) throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java index a3b475193e46b..7f7a03ab2e182 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java @@ -106,6 +106,9 @@ public static DatafeedConfig.Builder createRandomBuilder() { if (randomBoolean()) { builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig()); } + if (randomBoolean()) { + builder.setMaxEmptySearches(randomIntBetween(10, 100)); + } return builder; } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java index 165da632e0a0c..72ab156406fe5 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java @@ -80,6 +80,9 @@ public static DatafeedUpdate createRandom() { if (randomBoolean()) { builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig()); } + if (randomBoolean()) { + builder.setMaxEmptySearches(randomIntBetween(10, 100)); + } return builder.build(); } diff --git a/docs/reference/ml/anomaly-detection/apis/datafeedresource.asciidoc b/docs/reference/ml/anomaly-detection/apis/datafeedresource.asciidoc index 2cdc695ed91e0..389c9d704eacd 100644 --- a/docs/reference/ml/anomaly-detection/apis/datafeedresource.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/datafeedresource.asciidoc @@ -65,6 +65,15 @@ A {dfeed} resource has the following properties: `{"enabled": true, "check_window": "1h"}` See <>. +`max_empty_searches`:: + (integer) If a real-time {dfeed} has never seen any data (including during + any initial training period) then it will automatically stop itself and + close its associated job after this many real-time searches that return no + documents. In other words, it will stop after `frequency` times + `max_empty_searches` of real-time operation. If not set + then a {dfeed} with no end time that sees no data will remain started until + it is explicitly stopped. By default this setting is not set. + [[ml-datafeed-chunking-config]] ==== Chunking configuration objects diff --git a/docs/reference/ml/anomaly-detection/apis/update-datafeed.asciidoc b/docs/reference/ml/anomaly-detection/apis/update-datafeed.asciidoc index 910bb727e9763..732f23202b1bf 100644 --- a/docs/reference/ml/anomaly-detection/apis/update-datafeed.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/update-datafeed.asciidoc @@ -101,6 +101,15 @@ parallel and close one when you are satisfied with the results of the other job. (Optional, unsigned integer) The `size` parameter that is used in {es} searches. The default value is `1000`. +`max_empty_searches`:: + (Optional, integer) If a real-time {dfeed} has never seen any data (including + during any initial training period) then it will automatically stop itself + and close its associated job after this many real-time searches that return + no documents. In other words, it will stop after `frequency` times + `max_empty_searches` of real-time operation. If not set + then a {dfeed} with no end time that sees no data will remain started until + it is explicitly stopped. The special value `-1` unsets this setting. + For more information about these properties, see <>. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 9285256c76819..634d2c71a4010 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; @@ -90,6 +91,7 @@ public class DatafeedConfig extends AbstractDiffable implements public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config"); public static final ParseField HEADERS = new ParseField("headers"); public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); + public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches"); // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly public static final ObjectParser LENIENT_PARSER = createParser(true); @@ -151,6 +153,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie parser.declareObject(Builder::setDelayedDataCheckConfig, ignoreUnknownFields ? DelayedDataCheckConfig.LENIENT_PARSER : DelayedDataCheckConfig.STRICT_PARSER, DELAYED_DATA_CHECK_CONFIG); + parser.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES); return parser; } @@ -175,11 +178,12 @@ private static ObjectParser createParser(boolean ignoreUnknownFie private final ChunkingConfig chunkingConfig; private final Map headers; private final DelayedDataCheckConfig delayedDataCheckConfig; + private final Integer maxEmptySearches; private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, QueryProvider queryProvider, AggProvider aggProvider, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, Map headers, - DelayedDataCheckConfig delayedDataCheckConfig) { + DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -192,6 +196,7 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue this.chunkingConfig = chunkingConfig; this.headers = Collections.unmodifiableMap(headers); this.delayedDataCheckConfig = delayedDataCheckConfig; + this.maxEmptySearches = maxEmptySearches; } public DatafeedConfig(StreamInput in) throws IOException { @@ -218,6 +223,12 @@ public DatafeedConfig(StreamInput in) throws IOException { this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); delayedDataCheckConfig = in.readOptionalWriteable(DelayedDataCheckConfig::new); + // TODO: change version in backport + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + maxEmptySearches = in.readOptionalVInt(); + } else { + maxEmptySearches = null; + } } /** @@ -386,6 +397,10 @@ public DelayedDataCheckConfig getDelayedDataCheckConfig() { return delayedDataCheckConfig; } + public Integer getMaxEmptySearches() { + return maxEmptySearches; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); @@ -414,6 +429,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(chunkingConfig); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); out.writeOptionalWriteable(delayedDataCheckConfig); + // TODO: change version in backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalVInt(maxEmptySearches); + } } @Override @@ -450,6 +469,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (delayedDataCheckConfig != null) { builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig); } + if (maxEmptySearches != null) { + builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches); + } builder.endObject(); return builder; } @@ -482,13 +504,14 @@ public boolean equals(Object other) { && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) && Objects.equals(this.headers, that.headers) - && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig); + && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); } @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig, - headers, delayedDataCheckConfig); + headers, delayedDataCheckConfig, maxEmptySearches); } @Override @@ -561,6 +584,7 @@ public static class Builder { private ChunkingConfig chunkingConfig; private Map headers = Collections.emptyMap(); private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig(); + private Integer maxEmptySearches; public Builder() { } @@ -583,6 +607,7 @@ public Builder(DatafeedConfig config) { this.chunkingConfig = config.chunkingConfig; this.headers = new HashMap<>(config.headers); this.delayedDataCheckConfig = config.getDelayedDataCheckConfig(); + this.maxEmptySearches = config.getMaxEmptySearches(); } public void setId(String datafeedId) { @@ -676,6 +701,18 @@ public void setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckCon this.delayedDataCheckConfig = delayedDataCheckConfig; } + public void setMaxEmptySearches(int maxEmptySearches) { + if (maxEmptySearches == -1) { + this.maxEmptySearches = null; + } else if (maxEmptySearches <= 0) { + String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, + DatafeedConfig.MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches); + throw ExceptionsHelper.badRequestException(msg); + } else { + this.maxEmptySearches = maxEmptySearches; + } + } + public DatafeedConfig build() { ExceptionsHelper.requireNonNull(id, ID.getPreferredName()); ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); @@ -691,7 +728,7 @@ public DatafeedConfig build() { setDefaultQueryDelay(); return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize, - chunkingConfig, headers, delayedDataCheckConfig); + chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches); } void validateScriptFields() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index 55bb54dfa5719..d2b381c7eefc2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -78,6 +78,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.STRICT_PARSER, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG); + PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES); } private final String id; @@ -91,11 +92,13 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { private final Integer scrollSize; private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; + private final Integer maxEmptySearches; private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, QueryProvider queryProvider, AggProvider aggProvider, List scriptFields, - Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) { + Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig, + Integer maxEmptySearches) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -107,6 +110,7 @@ private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue this.scrollSize = scrollSize; this.chunkingConfig = chunkingConfig; this.delayedDataCheckConfig = delayedDataCheckConfig; + this.maxEmptySearches = maxEmptySearches; } public DatafeedUpdate(StreamInput in) throws IOException { @@ -140,6 +144,12 @@ public DatafeedUpdate(StreamInput in) throws IOException { this.scrollSize = in.readOptionalVInt(); this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new); delayedDataCheckConfig = in.readOptionalWriteable(DelayedDataCheckConfig::new); + // TODO: change version in backport + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + maxEmptySearches = in.readOptionalInt(); + } else { + maxEmptySearches = null; + } } /** @@ -183,6 +193,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalVInt(scrollSize); out.writeOptionalWriteable(chunkingConfig); out.writeOptionalWriteable(delayedDataCheckConfig); + // TODO: change version in backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalInt(maxEmptySearches); + } } @Override @@ -213,6 +227,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize); addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); addOptionalField(builder, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG, delayedDataCheckConfig); + addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches); + builder.endObject(); return builder; } @@ -281,6 +297,10 @@ public DelayedDataCheckConfig getDelayedDataCheckConfig() { return delayedDataCheckConfig; } + public Integer getMaxEmptySearches() { + return maxEmptySearches; + } + /** * Applies the update to the given {@link DatafeedConfig} * @return a new {@link DatafeedConfig} that contains the update @@ -325,6 +345,9 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map h if (delayedDataCheckConfig != null) { builder.setDelayedDataCheckConfig(delayedDataCheckConfig); } + if (maxEmptySearches != null) { + builder.setMaxEmptySearches(maxEmptySearches); + } if (headers.isEmpty() == false) { // Adjust the request, adding security headers from the current thread context @@ -364,13 +387,14 @@ public boolean equals(Object other) { && Objects.equals(this.aggProvider, that.aggProvider) && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) && Objects.equals(this.scriptFields, that.scriptFields) - && Objects.equals(this.chunkingConfig, that.chunkingConfig); + && Objects.equals(this.chunkingConfig, that.chunkingConfig) + && Objects.equals(this.maxEmptySearches, that.maxEmptySearches); } @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig, - delayedDataCheckConfig); + delayedDataCheckConfig, maxEmptySearches); } @Override @@ -387,7 +411,8 @@ boolean isNoop(DatafeedConfig datafeed) { && (aggProvider == null || Objects.equals(aggProvider.getAggs(), datafeed.getAggregations())) && (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields())) && (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig())) - && (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig())); + && (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig())) + && (maxEmptySearches == null || Objects.equals(maxEmptySearches, datafeed.getMaxEmptySearches())); } public static class Builder { @@ -403,6 +428,7 @@ public static class Builder { private Integer scrollSize; private ChunkingConfig chunkingConfig; private DelayedDataCheckConfig delayedDataCheckConfig; + private Integer maxEmptySearches; public Builder() { } @@ -423,6 +449,7 @@ public Builder(DatafeedUpdate config) { this.scrollSize = config.scrollSize; this.chunkingConfig = config.chunkingConfig; this.delayedDataCheckConfig = config.delayedDataCheckConfig; + this.maxEmptySearches = config.maxEmptySearches; } public Builder setId(String datafeedId) { @@ -490,9 +517,19 @@ public Builder setChunkingConfig(ChunkingConfig chunkingConfig) { return this; } + public Builder setMaxEmptySearches(int maxEmptySearches) { + if (maxEmptySearches < -1 || maxEmptySearches == 0) { + String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, + DatafeedConfig.MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches); + throw ExceptionsHelper.badRequestException(msg); + } + this.maxEmptySearches = maxEmptySearches; + return this; + } + public DatafeedUpdate build() { return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize, - chunkingConfig, delayedDataCheckConfig); + chunkingConfig, delayedDataCheckConfig, maxEmptySearches); } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 15ba85a8edbf1..3d1677fcc78d3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -7,6 +7,7 @@ import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -68,6 +69,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; public class DatafeedConfigTests extends AbstractSerializingTestCase { @@ -149,6 +151,9 @@ private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(Stri if (randomBoolean()) { builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig(bucketSpanMillis)); } + if (randomBoolean()) { + builder.setMaxEmptySearches(randomIntBetween(10, 100)); + } return builder; } @@ -378,10 +383,10 @@ public void testDefaults() { defaultFeedBuilder.setIndices(Collections.singletonList("index")); DatafeedConfig defaultFeed = defaultFeedBuilder.build(); - assertThat(defaultFeed.getScrollSize(), equalTo(1000)); assertThat(defaultFeed.getQueryDelay().seconds(), greaterThanOrEqualTo(60L)); assertThat(defaultFeed.getQueryDelay().seconds(), lessThan(120L)); + assertThat(defaultFeed.getMaxEmptySearches(), is(nullValue())); } public void testDefaultQueryDelay() { @@ -406,6 +411,20 @@ public void testCheckValid_GivenNullIndices() { expectThrows(IllegalArgumentException.class, () -> conf.setIndices(null)); } + public void testCheckValid_GivenInvalidMaxEmptySearches() { + DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); + ElasticsearchStatusException e = + expectThrows(ElasticsearchStatusException.class, () -> conf.setMaxEmptySearches(randomFrom(-2, 0))); + assertThat(e.getMessage(), containsString("Invalid max_empty_searches value")); + } + + public void testCheckValid_GivenMaxEmptySearchesMinusOne() { + DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); + conf.setIndices(Collections.singletonList("whatever")); + conf.setMaxEmptySearches(-1); + assertThat(conf.build().getMaxEmptySearches(), is(nullValue())); + } + public void testCheckValid_GivenEmptyIndices() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); conf.setIndices(Collections.emptyList()); @@ -824,7 +843,7 @@ private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggre @Override protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOException { DatafeedConfig.Builder builder = new DatafeedConfig.Builder(instance); - switch (between(0, 9)) { + switch (between(0, 10)) { case 0: builder.setId(instance.getId() + randomValidDatafeedId()); break; @@ -886,6 +905,13 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOExcept builder.setChunkingConfig(ChunkingConfig.newAuto()); } break; + case 10: + if (instance.getMaxEmptySearches() == null) { + builder.setMaxEmptySearches(randomIntBetween(10, 100)); + } else { + builder.setMaxEmptySearches(instance.getMaxEmptySearches() + 1); + } + break; default: throw new AssertionError("Illegal randomisation branch"); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java index ba4c8543de663..87777d4c75138 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java @@ -120,6 +120,9 @@ public static DatafeedUpdate createRandomized(String datafeedId, @Nullable Dataf if (randomBoolean()) { builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig(randomLongBetween(300_001, 400_000))); } + if (randomBoolean()) { + builder.setMaxEmptySearches(randomBoolean() ? -1 : randomIntBetween(10, 100)); + } return builder.build(); } @@ -355,7 +358,7 @@ public void testSerializationOfComplexAggsBetweenVersions() throws IOException { @Override protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) throws IOException { DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(instance); - switch (between(1, 9)) { + switch (between(1, 10)) { case 1: builder.setId(instance.getId() + DatafeedConfigTests.randomValidDatafeedId()); break; @@ -426,6 +429,13 @@ protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) throws IOExcept builder.setChunkingConfig(null); } break; + case 10: + if (instance.getMaxEmptySearches() == null) { + builder.setMaxEmptySearches(randomFrom(-1, 10)); + } else { + builder.setMaxEmptySearches(instance.getMaxEmptySearches() + 100); + } + break; default: throw new AssertionError("Illegal randomisation branch"); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index 241f3cb0e63d5..6a47ca40db4e9 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; +import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -32,7 +33,6 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -50,7 +50,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { @After - public void cleanup() throws Exception { + public void cleanup() { cleanUp(); } @@ -110,7 +110,7 @@ public void testDatafeedTimingStats_DatafeedRecreated() throws Exception { Job.Builder job = createScheduledJob("lookback-job-datafeed-recreated"); String datafeedId = "lookback-datafeed-datafeed-recreated"; - DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data")); + DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data")); registerJob(job); putJob(job); @@ -149,7 +149,7 @@ public void testDatafeedTimingStats_QueryDelayUpdated_TimingStatsNotReset() thro putJob(job); String datafeedId = "lookback-datafeed-query-delay-updated"; - DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data")); + DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data")); registerDatafeed(datafeedConfig); putDatafeed(datafeedConfig); @@ -204,6 +204,26 @@ public void testRealtime() throws Exception { }); } + public void testRealtime_noDataAndAutoStop() throws Exception { + String jobId = "realtime-job-auto-stop"; + String datafeedId = jobId + "-datafeed"; + startRealtime(jobId, randomIntBetween(1, 3)); + + // Datafeed should auto-stop... + assertBusy(() -> { + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); + }); + + // ...and should have auto-closed the job too + assertBusy(() -> { + GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId); + GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getState(), equalTo(JobState.CLOSED)); + }); + } + public void testRealtime_multipleStopCalls() throws Exception { String jobId = "realtime-job-multiple-stop"; final String datafeedId = jobId + "-datafeed"; @@ -359,13 +379,22 @@ public void testStopLookbackFollowedByProcessKill() throws Exception { } private void startRealtime(String jobId) throws Exception { + startRealtime(jobId, null); + } + + private void startRealtime(String jobId, Integer maxEmptySearches) throws Exception { client().admin().indices().prepareCreate("data") .addMapping("type", "time", "type=date") .get(); - long numDocs1 = randomIntBetween(32, 2048); long now = System.currentTimeMillis(); - long lastWeek = now - 604800000; - indexDocs(logger, "data", numDocs1, lastWeek, now); + long numDocs1; + if (maxEmptySearches == null) { + numDocs1 = randomIntBetween(32, 2048); + long lastWeek = now - 604800000; + indexDocs(logger, "data", numDocs1, lastWeek, now); + } else { + numDocs1 = 0; + } Job.Builder job = createScheduledJob(jobId); registerJob(job); @@ -373,7 +402,12 @@ private void startRealtime(String jobId) throws Exception { openJob(job.getId()); assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); - DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data")); + DatafeedConfig.Builder datafeedConfigBuilder = + createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data")); + if (maxEmptySearches != null) { + datafeedConfigBuilder.setMaxEmptySearches(maxEmptySearches); + } + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); registerDatafeed(datafeedConfig); putDatafeed(datafeedConfig); startDatafeed(datafeedConfig.getId(), 0L, null); @@ -383,9 +417,15 @@ private void startRealtime(String jobId) throws Exception { assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); }); - long numDocs2 = randomIntBetween(2, 64); now = System.currentTimeMillis(); - indexDocs(logger, "data", numDocs2, now + 5000, now + 6000); + long numDocs2; + if (maxEmptySearches == null) { + numDocs2 = randomIntBetween(2, 64); + indexDocs(logger, "data", numDocs2, now + 5000, now + 6000); + } else { + numDocs2 = 0; + } + assertBusy(() -> { DataCounts dataCounts = getDataCounts(job.getId()); assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 257a194752840..22ebff57a4ba8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -64,6 +64,7 @@ class DatafeedJob { private final DatafeedTimingStatsReporter timingStatsReporter; private final Supplier currentTimeSupplier; private final DelayedDataDetector delayedDataDetector; + private final Integer maxEmptySearches; private volatile long lookbackStartTimeMs; private volatile long latestFinalBucketEndTimeMs; @@ -73,11 +74,12 @@ class DatafeedJob { private volatile Long lastEndTimeMs; private AtomicBoolean running = new AtomicBoolean(true); private volatile boolean isIsolated; + private volatile boolean haveEverSeenData; DatafeedJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory, DatafeedTimingStatsReporter timingStatsReporter, Client client, AnomalyDetectionAuditor auditor, Supplier currentTimeSupplier, DelayedDataDetector delayedDataDetector, - long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { + Integer maxEmptySearches, long latestFinalBucketEndTimeMs, long latestRecordTimeMs, boolean haveSeenDataPreviously) { this.jobId = jobId; this.dataDescription = Objects.requireNonNull(dataDescription); this.frequencyMs = frequencyMs; @@ -88,11 +90,13 @@ class DatafeedJob { this.auditor = auditor; this.currentTimeSupplier = currentTimeSupplier; this.delayedDataDetector = delayedDataDetector; + this.maxEmptySearches = maxEmptySearches; this.latestFinalBucketEndTimeMs = latestFinalBucketEndTimeMs; long lastEndTime = Math.max(latestFinalBucketEndTimeMs, latestRecordTimeMs); if (lastEndTime > 0) { lastEndTimeMs = lastEndTime; } + this.haveEverSeenData = haveSeenDataPreviously; } void isolate() { @@ -108,6 +112,10 @@ public String getJobId() { return jobId; } + public Integer getMaxEmptySearches() { + return maxEmptySearches; + } + public void finishReportingTimingStats() { timingStatsReporter.finishReporting(); } @@ -380,6 +388,7 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro break; } recordCount += counts.getProcessedRecordCount(); + haveEverSeenData |= (recordCount > 0); if (counts.getLatestRecordTimeStamp() != null) { lastEndTimeMs = counts.getLatestRecordTimeStamp().getTime(); } @@ -406,7 +415,7 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro } if (recordCount == 0) { - throw new EmptyDataCountException(nextRealtimeTimestamp()); + throw new EmptyDataCountException(nextRealtimeTimestamp(), haveEverSeenData); } } @@ -509,10 +518,11 @@ static class ExtractionProblemException extends RuntimeException { static class EmptyDataCountException extends RuntimeException { final long nextDelayInMsSinceEpoch; + final boolean haveEverSeenData; - EmptyDataCountException(long nextDelayInMsSinceEpoch) { + EmptyDataCountException(long nextDelayInMsSinceEpoch, boolean haveEverSeenData) { this.nextDelayInMsSinceEpoch = nextDelayInMsSinceEpoch; + this.haveEverSeenData = haveEverSeenData; } } - } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index 1d0bd1ae04a8a..dc88dac0e7cce 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -92,8 +92,10 @@ void build(String datafeedId, ActionListener listener) { auditor, currentTimeSupplier, delayedDataDetector, + datafeedConfigHolder.get().getMaxEmptySearches(), context.latestFinalBucketEndMs, - context.latestRecordTimeMs); + context.latestRecordTimeMs, + context.haveSeenDataPreviously); listener.onResponse(datafeedJob); }; @@ -128,6 +130,7 @@ void build(String datafeedId, ActionListener listener) { if (dataCounts.getLatestRecordTimeStamp() != null) { context.latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime(); } + context.haveSeenDataPreviously = (dataCounts.getInputRecordCount() > 0); jobResultsProvider.datafeedTimingStats(jobHolder.get().getId(), datafeedTimingStatsHandler, listener::onFailure); }; @@ -223,6 +226,7 @@ private static DataDescription buildDataDescription(Job job) { private static class Context { volatile long latestFinalBucketEndMs = -1L; volatile long latestRecordTimeMs = -1L; + volatile boolean haveSeenDataPreviously; volatile DataExtractorFactory dataExtractorFactory; volatile DatafeedTimingStatsReporter timingStatsReporter; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 30a3948fcc200..2b7a40abc2e36 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -80,7 +80,6 @@ public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clus clusterService.addListener(taskRunner); } - public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer finishHandler) { String datafeedId = task.getDatafeedId(); @@ -233,7 +232,7 @@ protected void doRun() { long nextDelayInMsSinceEpoch; try { nextDelayInMsSinceEpoch = holder.executeRealTime(); - holder.problemTracker.reportNoneEmptyCount(); + holder.problemTracker.reportNonEmptyDataCount(); } catch (DatafeedJob.ExtractionProblemException e) { nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch; holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); @@ -245,8 +244,15 @@ protected void doRun() { return; } } catch (DatafeedJob.EmptyDataCountException e) { + int emptyDataCount = holder.problemTracker.reportEmptyDataCount(); + if (e.haveEverSeenData == false && holder.shouldStopAfterEmptyData(emptyDataCount)) { + logger.warn("Datafeed for [" + jobId + "] has seen no data in [" + emptyDataCount + + "] attempts, and never seen any data previously, so stopping..."); + // In this case we auto-close the job, as though a lookback-only datafeed stopped + holder.stop("no_data", TimeValue.timeValueSeconds(20), e, true); + return; + } nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch; - holder.problemTracker.reportEmptyDataCount(); } catch (Exception e) { logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", e); holder.stop("general_realtime_error", TimeValue.timeValueSeconds(20), e); @@ -303,7 +309,7 @@ public class Holder { // To ensure that we wait until lookback / realtime search has completed before we stop the datafeed private final ReentrantLock datafeedJobLock = new ReentrantLock(true); private final DatafeedJob datafeedJob; - private final boolean autoCloseJob; + private final boolean defaultAutoCloseJob; private final ProblemTracker problemTracker; private final Consumer finishHandler; volatile Scheduler.Cancellable cancellable; @@ -315,11 +321,16 @@ public class Holder { this.allocationId = task.getAllocationId(); this.datafeedId = datafeedId; this.datafeedJob = datafeedJob; - this.autoCloseJob = task.isLookbackOnly(); + this.defaultAutoCloseJob = task.isLookbackOnly(); this.problemTracker = problemTracker; this.finishHandler = finishHandler; } + boolean shouldStopAfterEmptyData(int emptyDataCount) { + Integer emptyDataCountToStopAt = datafeedJob.getMaxEmptySearches(); + return emptyDataCountToStopAt != null && emptyDataCount >= emptyDataCountToStopAt; + } + String getJobId() { return datafeedJob.getJobId(); } @@ -333,6 +344,10 @@ boolean isIsolated() { } public void stop(String source, TimeValue timeout, Exception e) { + stop(source, timeout, e, defaultAutoCloseJob); + } + + public void stop(String source, TimeValue timeout, Exception e, boolean autoCloseJob) { if (isNodeShuttingDown) { return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java index 69a821d424649..a8260c2eade50 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java @@ -74,16 +74,14 @@ private void reportProblem(String template, String problemMessage) { * Updates the tracking of empty data cycles. If the number of consecutive empty data * cycles reaches {@code EMPTY_DATA_WARN_COUNT}, a warning is reported. */ - public void reportEmptyDataCount() { - if (emptyDataCount < EMPTY_DATA_WARN_COUNT) { - emptyDataCount++; - if (emptyDataCount == EMPTY_DATA_WARN_COUNT) { - auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_NO_DATA)); - } + public int reportEmptyDataCount() { + if (++emptyDataCount == EMPTY_DATA_WARN_COUNT) { + auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_NO_DATA)); } + return emptyDataCount; } - public void reportNoneEmptyCount() { + public void reportNonEmptyDataCount() { if (emptyDataCount >= EMPTY_DATA_WARN_COUNT) { auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_DATA_SEEN_AGAIN)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 47f8a386d3384..b28480354d9b6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -133,7 +133,7 @@ public void setup() throws Exception { } public void testLookBackRunWithEndTime() throws Exception { - DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); + DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, randomBoolean()); assertNull(datafeedJob.runLookBack(0L, 1000L)); verify(dataExtractorFactory).newExtractor(0L, 1000L); @@ -145,7 +145,7 @@ public void testLookBackRunWithEndTime() throws Exception { public void testSetIsolated() throws Exception { currentTime = 2000L; - DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); + DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, randomBoolean()); datafeedJob.isolate(); assertNull(datafeedJob.runLookBack(0L, null)); @@ -158,7 +158,7 @@ public void testLookBackRunWithNoEndTime() throws Exception { currentTime = 2000L; long frequencyMs = 1000; long queryDelayMs = 500; - DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, -1, -1); + DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, -1, -1, randomBoolean()); long next = datafeedJob.runLookBack(0L, null); assertEquals(2000 + frequencyMs + queryDelayMs + 100, next); @@ -181,7 +181,7 @@ public void testLookBackRunWithStartTimeEarlierThanResumePoint() throws Exceptio long frequencyMs = 1000; long queryDelayMs = 500; - DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs); + DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs, true); long next = datafeedJob.runLookBack(0L, null); assertEquals(10000 + frequencyMs + queryDelayMs + 100, next); @@ -206,7 +206,7 @@ public void testContinueFromNow() throws Exception { long frequencyMs = 1000; long queryDelayMs = 500; - DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs); + DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs, true); datafeedJob.runLookBack(currentTime, null); // advance time @@ -238,7 +238,7 @@ public void testRealtimeRun() throws Exception { currentTime = 60000L; long frequencyMs = 100; long queryDelayMs = 1000; - DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1); + DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, false); long next = datafeedJob.runRealtime(); assertEquals(currentTime + frequencyMs + 100, next); @@ -344,7 +344,7 @@ public void testRealtimeRun() throws Exception { public void testEmptyDataCountGivenlookback() throws Exception { when(dataExtractor.hasNext()).thenReturn(false); - DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); + DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, false); expectThrows(DatafeedJob.EmptyDataCountException.class, () -> datafeedJob.runLookBack(0L, 1000L)); verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any()); verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); @@ -355,7 +355,7 @@ public void testExtractionProblem() throws Exception { when(dataExtractor.hasNext()).thenReturn(true); when(dataExtractor.next()).thenThrow(new IOException()); - DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); + DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, randomBoolean()); expectThrows(DatafeedJob.ExtractionProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L)); currentTime = 3001; @@ -382,7 +382,7 @@ public void testPostAnalysisProblem() { when(dataExtractor.getEndTime()).thenReturn(1000L); - DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); + DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, randomBoolean()); DatafeedJob.AnalysisProblemException analysisProblemException = expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L)); assertThat(analysisProblemException.shouldStop, is(false)); @@ -411,7 +411,7 @@ public void testPostAnalysisProblemIsConflict() { when(dataExtractor.getEndTime()).thenReturn(1000L); - DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1); + DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, randomBoolean()); DatafeedJob.AnalysisProblemException analysisProblemException = expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L)); assertThat(analysisProblemException.shouldStop, is(true)); @@ -436,7 +436,7 @@ public void testFlushAnalysisProblem() { currentTime = 60000L; long frequencyMs = 100; long queryDelayMs = 1000; - DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1); + DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, randomBoolean()); DatafeedJob.AnalysisProblemException analysisProblemException = expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runRealtime()); assertThat(analysisProblemException.shouldStop, is(false)); @@ -448,16 +448,17 @@ public void testFlushAnalysisProblemIsConflict() { currentTime = 60000L; long frequencyMs = 100; long queryDelayMs = 1000; - DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1); + DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, randomBoolean()); DatafeedJob.AnalysisProblemException analysisProblemException = expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runRealtime()); assertThat(analysisProblemException.shouldStop, is(true)); } private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, - long latestRecordTimeMs) { + long latestRecordTimeMs, boolean haveSeenDataPreviously) { Supplier currentTimeSupplier = () -> currentTime; return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, timingStatsReporter, - client, auditor, currentTimeSupplier, delayedDataDetector, latestFinalBucketEndTimeMs, latestRecordTimeMs); + client, auditor, currentTimeSupplier, delayedDataDetector, null, latestFinalBucketEndTimeMs, latestRecordTimeMs, + haveSeenDataPreviously); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 765c70e00ad1d..c72b11b772ff8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -114,6 +114,7 @@ public void setUpTests() { when(datafeedJob.isRunning()).thenReturn(true); when(datafeedJob.stop()).thenReturn(true); when(datafeedJob.getJobId()).thenReturn(job.getId()); + when(datafeedJob.getMaxEmptySearches()).thenReturn(null); DatafeedJobBuilder datafeedJobBuilder = mock(DatafeedJobBuilder.class); doAnswer(invocationOnMock -> { @SuppressWarnings("rawtypes") @@ -133,7 +134,7 @@ public void setUpTests() { } public void testLookbackOnly_WarnsWhenNoDataIsRetrieved() throws Exception { - when(datafeedJob.runLookBack(0L, 60000L)).thenThrow(new DatafeedJob.EmptyDataCountException(0L)); + when(datafeedJob.runLookBack(0L, 60000L)).thenThrow(new DatafeedJob.EmptyDataCountException(0L, false)); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); datafeedManager.run(task, handler); @@ -176,8 +177,8 @@ public void testStart_emptyDataCountException() throws Exception { return mock(Scheduler.ScheduledCancellable.class); }).when(threadPool).schedule(any(), any(), any()); - when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.EmptyDataCountException(0L)); - when(datafeedJob.runRealtime()).thenThrow(new DatafeedJob.EmptyDataCountException(0L)); + when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.EmptyDataCountException(0L, false)); + when(datafeedJob.runRealtime()).thenThrow(new DatafeedJob.EmptyDataCountException(0L, false)); Consumer handler = mockConsumer(); DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java index bfbd85ca94441..d4deb6c665040 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java @@ -85,7 +85,7 @@ public void testUpdateEmptyDataCount_GivenNonEmptyAfterNineEmpty() { for (int i = 0; i < 9; i++) { problemTracker.reportEmptyDataCount(); } - problemTracker.reportNoneEmptyCount(); + problemTracker.reportNonEmptyDataCount(); Mockito.verifyNoMoreInteractions(auditor); } @@ -94,7 +94,7 @@ public void testUpdateEmptyDataCount_GivenNonEmptyAfterTenEmpty() { for (int i = 0; i < 10; i++) { problemTracker.reportEmptyDataCount(); } - problemTracker.reportNoneEmptyCount(); + problemTracker.reportNonEmptyDataCount(); verify(auditor).warning("foo", "Datafeed has been retrieving no data for a while"); verify(auditor).info("foo", "Datafeed has started retrieving data again"); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml index c31e855023ca5..8c6e94635f4da 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml @@ -181,8 +181,10 @@ setup: "indexes":["index-foo"], "scroll_size": 2000, "frequency": "1m", - "query_delay": "30s" + "query_delay": "30s", + "max_empty_searches": 42 } + - match: { max_empty_searches: 42 } - do: ml.update_datafeed: @@ -192,7 +194,8 @@ setup: "indexes":["index-*"], "scroll_size": 10000, "frequency": "2m", - "query_delay": "0s" + "query_delay": "0s", + "max_empty_searches": -1 } - match: { datafeed_id: "test-datafeed-1" } - match: { job_id: "datafeeds-crud-1" } @@ -200,6 +203,7 @@ setup: - match: { scroll_size: 10000 } - match: { frequency: "2m" } - match: { query_delay: "0s" } + - is_false: max_empty_searches --- "Test update datafeed to point to different job": @@ -357,7 +361,8 @@ setup: } } } - } + }, + "max_empty_searches": -1 } - do: ml.get_datafeeds: @@ -367,6 +372,7 @@ setup: - match: { datafeeds.0.aggregations.histogram_buckets.aggs.@timestamp.max.field: "@timestamp" } - match: { datafeeds.0.aggregations.histogram_buckets.aggs.bytes_in_avg.avg.field: "system.network.in.bytes" } - match: { datafeeds.0.aggregations.histogram_buckets.aggs.non_negative_bytes.bucket_script.buckets_path.bytes: "bytes_in_derivative" } + - is_false: max_empty_searches --- "Test delete datafeed":