Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Add option to stop datafeed that finds no data #47922

Merged
merged 7 commits into from
Oct 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DatafeedConfig implements ToXContentObject {
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");
public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches");

public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"datafeed_config", true, a -> new Builder((String)a[0], (String)a[1]));
Expand All @@ -88,6 +89,7 @@ public class DatafeedConfig implements ToXContentObject {
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG);
PARSER.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES);
}

private static BytesReference parseBytes(XContentParser parser) throws IOException {
Expand All @@ -107,11 +109,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;

private final Integer maxEmptySearches;

private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
Integer maxEmptySearches) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
Expand All @@ -123,6 +126,7 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
}

public String getId() {
Expand Down Expand Up @@ -169,6 +173,10 @@ public DelayedDataCheckConfig getDelayedDataCheckConfig() {
return delayedDataCheckConfig;
}

public Integer getMaxEmptySearches() {
return maxEmptySearches;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -205,6 +213,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (delayedDataCheckConfig != null) {
builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig);
}
if (maxEmptySearches != null) {
builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches);
}

builder.endObject();
return builder;
Expand Down Expand Up @@ -245,7 +256,8 @@ public boolean equals(Object other) {
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig);
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
}

/**
Expand All @@ -256,7 +268,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig, delayedDataCheckConfig);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
}

public static Builder builder(String id, String jobId) {
Expand All @@ -276,6 +288,7 @@ public static class Builder {
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;
private Integer maxEmptySearches;

public Builder(String id, String jobId) {
this.id = Objects.requireNonNull(id, ID.getPreferredName());
Expand All @@ -294,6 +307,7 @@ public Builder(DatafeedConfig config) {
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
this.maxEmptySearches = config.getMaxEmptySearches();
}

public Builder setIndices(List<String> indices) {
Expand Down Expand Up @@ -376,9 +390,14 @@ public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheck
return this;
}

public Builder setMaxEmptySearches(int maxEmptySearches) {
this.maxEmptySearches = maxEmptySearches;
return this;
}

public DatafeedConfig build() {
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
chunkingConfig, delayedDataCheckConfig);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
}

private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class DatafeedUpdate implements ToXContentObject {
PARSER.declareObject(Builder::setDelayedDataCheckConfig,
DelayedDataCheckConfig.PARSER,
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG);
PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES);
}

private static BytesReference parseBytes(XContentParser parser) throws IOException {
Expand All @@ -95,10 +96,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final Integer maxEmptySearches;

private DatafeedUpdate(String id, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
Integer maxEmptySearches) {
this.id = id;
this.queryDelay = queryDelay;
this.frequency = frequency;
Expand All @@ -109,6 +112,7 @@ private DatafeedUpdate(String id, TimeValue queryDelay, TimeValue frequency, Lis
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
}

/**
Expand Down Expand Up @@ -147,6 +151,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches);
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -193,6 +198,10 @@ public DelayedDataCheckConfig getDelayedDataCheckConfig() {
return delayedDataCheckConfig;
}

public Integer getMaxEmptySearches() {
return maxEmptySearches;
}

private static Map<String, Object> asMap(BytesReference bytesReference) {
return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
}
Expand Down Expand Up @@ -227,7 +236,8 @@ public boolean equals(Object other) {
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
}

/**
Expand All @@ -238,7 +248,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
return Objects.hash(id, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig, delayedDataCheckConfig);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
}

public static Builder builder(String id) {
Expand All @@ -257,6 +267,7 @@ public static class Builder {
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;
private Integer maxEmptySearches;

public Builder(String id) {
this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
Expand All @@ -273,6 +284,7 @@ public Builder(DatafeedUpdate config) {
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
this.maxEmptySearches = config.maxEmptySearches;
}

public Builder setIndices(List<String> indices) {
Expand Down Expand Up @@ -346,9 +358,14 @@ public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheck
return this;
}

public Builder setMaxEmptySearches(int maxEmptySearches) {
this.maxEmptySearches = maxEmptySearches;
return this;
}

public DatafeedUpdate build() {
return new DatafeedUpdate(id, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
chunkingConfig, delayedDataCheckConfig);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
}

private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public static DatafeedConfig.Builder createRandomBuilder() {
if (randomBoolean()) {
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
}
if (randomBoolean()) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
}
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public static DatafeedUpdate createRandom() {
if (randomBoolean()) {
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
}
if (randomBoolean()) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ A {dfeed} resource has the following properties:
`{"enabled": true, "check_window": "1h"}` See
<<ml-datafeed-delayed-data-check-config>>.

`max_empty_searches`::
(integer) If a real-time {dfeed} has never seen any data (including during
any initial training period) then it will automatically stop itself and
close its associated job after this many real-time searches that return no
documents. In other words, it will stop after `frequency` times
`max_empty_searches` of real-time operation. If not set
then a {dfeed} with no end time that sees no data will remain started until
it is explicitly stopped. By default this setting is not set.

[[ml-datafeed-chunking-config]]
==== Chunking configuration objects

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ parallel and close one when you are satisfied with the results of the other job.
(Optional, unsigned integer) The `size` parameter that is used in {es}
searches. The default value is `1000`.

`max_empty_searches`::
(Optional, integer) If a real-time {dfeed} has never seen any data (including
during any initial training period) then it will automatically stop itself
and close its associated job after this many real-time searches that return
no documents. In other words, it will stop after `frequency` times
`max_empty_searches` of real-time operation. If not set
then a {dfeed} with no end time that sees no data will remain started until
it is explicitly stopped. The special value `-1` unsets this setting.

For more information about these properties, see <<ml-datafeed-resource>>.


Expand Down
Loading