Skip to content

Commit

Permalink
[7.x] [Transform] Align transform checkpoint range with date_histogra…
Browse files Browse the repository at this point in the history
…m interval for better performance (#74004) (#76570)
  • Loading branch information
przemekwitek authored Aug 16, 2021
1 parent d13ec5e commit 2a1f171
Show file tree
Hide file tree
Showing 23 changed files with 680 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,25 @@ public class SettingsConfig implements ToXContentObject {
private static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
private static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second");
private static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis");
private static final ParseField INTERIM_RESULTS = new ParseField("interim_results");
private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1;
private static final float DEFAULT_DOCS_PER_SECOND = -1F;

// use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side)
private static final int DEFAULT_DATES_AS_EPOCH_MILLIS = -1;

// use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side)
private static final int DEFAULT_INTERIM_RESULTS = -1;

private final Integer maxPageSearchSize;
private final Float docsPerSecond;
private final Integer datesAsEpochMillis;
private final Integer interimResults;

private static final ConstructingObjectParser<SettingsConfig, Void> PARSER = new ConstructingObjectParser<>(
"settings_config",
true,
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2])
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3])
);

static {
Expand All @@ -51,16 +56,24 @@ public class SettingsConfig implements ToXContentObject {
DATES_AS_EPOCH_MILLIS,
ValueType.BOOLEAN_OR_NULL
);
// this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser
PARSER.declareField(
optionalConstructorArg(),
p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_INTERIM_RESULTS : p.booleanValue() ? 1 : 0,
INTERIM_RESULTS,
ValueType.BOOLEAN_OR_NULL
);
}

public static SettingsConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}

SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis) {
SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer interimResults) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
this.datesAsEpochMillis = datesAsEpochMillis;
this.interimResults = interimResults;
}

@Override
Expand All @@ -87,6 +100,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(DATES_AS_EPOCH_MILLIS.getPreferredName(), datesAsEpochMillis > 0 ? true : false);
}
}
if (interimResults != null) {
if (interimResults.equals(DEFAULT_INTERIM_RESULTS)) {
builder.field(INTERIM_RESULTS.getPreferredName(), (Boolean) null);
} else {
builder.field(INTERIM_RESULTS.getPreferredName(), interimResults > 0 ? true : false);
}
}
builder.endObject();
return builder;
}
Expand All @@ -103,6 +123,10 @@ public Boolean getDatesAsEpochMillis() {
return datesAsEpochMillis != null ? datesAsEpochMillis > 0 : null;
}

public Boolean getInterimResults() {
return interimResults != null ? interimResults > 0 : null;
}

@Override
public boolean equals(Object other) {
if (other == this) {
Expand All @@ -115,12 +139,13 @@ public boolean equals(Object other) {
SettingsConfig that = (SettingsConfig) other;
return Objects.equals(maxPageSearchSize, that.maxPageSearchSize)
&& Objects.equals(docsPerSecond, that.docsPerSecond)
&& Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis);
&& Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis)
&& Objects.equals(interimResults, that.interimResults);
}

@Override
public int hashCode() {
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis);
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, interimResults);
}

public static Builder builder() {
Expand All @@ -131,6 +156,7 @@ public static class Builder {
private Integer maxPageSearchSize;
private Float docsPerSecond;
private Integer datesAsEpochMillis;
private Integer interimResults;

/**
* Sets the paging maximum paging maxPageSearchSize that transform can use when
Expand Down Expand Up @@ -176,8 +202,21 @@ public Builder setDatesAsEpochMillis(Boolean datesAsEpochMillis) {
return this;
}

/**
* Whether to write interim results in transform checkpoints.
*
* An explicit `null` resets to default.
*
* @param interimResults true if interim results should be written.
* @return the {@link Builder} with interimResults set.
*/
public Builder setInterimResults(Boolean interimResults) {
this.interimResults = interimResults == null ? DEFAULT_INTERIM_RESULTS : interimResults ? 1 : 0;
return this;
}

public SettingsConfig build() {
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis);
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, interimResults);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public static SettingsConfig randomSettingsConfig() {
return new SettingsConfig(
randomBoolean() ? null : randomIntBetween(10, 10_000),
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : randomIntBetween(-1, 1),
randomBoolean() ? null : randomIntBetween(-1, 1)
);
}
Expand Down Expand Up @@ -72,6 +73,7 @@ public void testExplicitNullOnWriteParser() throws IOException {
assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("interim_results", "not_set"), equalTo("not_set"));

config = fromString("{\"dates_as_epoch_millis\" : null}");
assertFalse(config.getDatesAsEpochMillis());
Expand All @@ -80,6 +82,16 @@ public void testExplicitNullOnWriteParser() throws IOException {
assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"));
assertThat(settingsAsMap.getOrDefault("interim_results", "not_set"), equalTo("not_set"));

config = fromString("{\"interim_results\" : null}");
assertFalse(config.getInterimResults());

settingsAsMap = xContentToMap(config);
assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("interim_results", "not_set"));
}

public void testExplicitNullOnWriteBuilder() throws IOException {
Expand All @@ -91,10 +103,12 @@ public void testExplicitNullOnWriteBuilder() throws IOException {
assertNull(settingsAsMap.getOrDefault("max_page_search_size", "not_set"));
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("interim_results", "not_set"), equalTo("not_set"));

SettingsConfig emptyConfig = new SettingsConfig.Builder().build();
assertNull(emptyConfig.getMaxPageSearchSize());
assertNull(emptyConfig.getDatesAsEpochMillis());
assertNull(emptyConfig.getInterimResults());

settingsAsMap = xContentToMap(emptyConfig);
assertTrue(settingsAsMap.isEmpty());
Expand All @@ -106,6 +120,7 @@ public void testExplicitNullOnWriteBuilder() throws IOException {
assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("interim_results", "not_set"), equalTo("not_set"));

config = new SettingsConfig.Builder().setDatesAsEpochMillis(null).build();
// returns false, however it's `null` as in "use default", checked next
Expand All @@ -115,6 +130,17 @@ public void testExplicitNullOnWriteBuilder() throws IOException {
assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"));
assertThat(settingsAsMap.getOrDefault("interim_results", "not_set"), equalTo("not_set"));

config = new SettingsConfig.Builder().setInterimResults(null).build();
// returns false, however it's `null` as in "use default", checked next
assertFalse(config.getInterimResults());

settingsAsMap = xContentToMap(config);
assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("interim_results", "not_set"));
}

private Map<String, Object> xContentToMap(ToXContent xcontent) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public static org.elasticsearch.xpack.core.transform.transforms.SettingsConfig r
return new org.elasticsearch.xpack.core.transform.transforms.SettingsConfig(
randomBoolean() ? null : randomIntBetween(10, 10_000),
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1)
);
}
Expand All @@ -34,6 +35,7 @@ public static void assertHlrcEquals(
assertEquals(serverTestInstance.getMaxPageSearchSize(), clientInstance.getMaxPageSearchSize());
assertEquals(serverTestInstance.getDocsPerSecond(), clientInstance.getDocsPerSecond());
assertEquals(serverTestInstance.getDatesAsEpochMillis(), clientInstance.getDatesAsEpochMillis());
assertEquals(serverTestInstance.getInterimResults(), clientInstance.getInterimResults());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public final class TransformField {
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
public static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second");
public static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis");
public static final ParseField INTERIM_RESULTS = new ParseField("interim_results");
public static final ParseField FIELD = new ParseField("field");
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField TIME = new ParseField("time");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ public class SettingsConfig implements Writeable, ToXContentObject {
private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1;
private static final float DEFAULT_DOCS_PER_SECOND = -1F;
private static final int DEFAULT_DATES_AS_EPOCH_MILLIS = -1;
private static final int DEFAULT_INTERIM_RESULTS = -1;

private static ConstructingObjectParser<SettingsConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<SettingsConfig, Void> parser = new ConstructingObjectParser<>(
"transform_config_settings",
lenient,
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2])
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3])
);
parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, TransformField.MAX_PAGE_SEARCH_SIZE);
parser.declareFloatOrNull(optionalConstructorArg(), DEFAULT_DOCS_PER_SECOND, TransformField.DOCS_PER_SECOND);
Expand All @@ -50,25 +51,39 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
TransformField.DATES_AS_EPOCH_MILLIS,
ValueType.BOOLEAN_OR_NULL
);
// this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser
parser.declareField(
optionalConstructorArg(),
p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_INTERIM_RESULTS : p.booleanValue() ? 1 : 0,
TransformField.INTERIM_RESULTS,
ValueType.BOOLEAN_OR_NULL
);
return parser;
}

private final Integer maxPageSearchSize;
private final Float docsPerSecond;
private final Integer datesAsEpochMillis;
private final Integer interimResults;

public SettingsConfig() {
this(null, null, (Integer) null);
this(null, null, (Integer) null, (Integer) null);
}

public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Boolean datesAsEpochMillis) {
this(maxPageSearchSize, docsPerSecond, datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0);
public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Boolean datesAsEpochMillis, Boolean interimResults) {
this(
maxPageSearchSize,
docsPerSecond,
datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0,
interimResults == null ? null : interimResults ? 1 : 0
);
}

public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis) {
public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer interimResults) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
this.datesAsEpochMillis = datesAsEpochMillis;
this.interimResults = interimResults;
}

public SettingsConfig(final StreamInput in) throws IOException {
Expand All @@ -79,6 +94,11 @@ public SettingsConfig(final StreamInput in) throws IOException {
} else {
this.datesAsEpochMillis = DEFAULT_DATES_AS_EPOCH_MILLIS;
}
if (in.getVersion().onOrAfter(Version.CURRENT)) { // TODO: 7.15
this.interimResults = in.readOptionalInt();
} else {
this.interimResults = DEFAULT_INTERIM_RESULTS;
}
}

public Integer getMaxPageSearchSize() {
Expand All @@ -97,6 +117,14 @@ public Integer getDatesAsEpochMillisForUpdate() {
return datesAsEpochMillis;
}

public Boolean getInterimResults() {
return interimResults != null ? interimResults > 0 : null;
}

public Integer getInterimResultsForUpdate() {
return interimResults;
}

public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > MultiBucketConsumerService.DEFAULT_MAX_BUCKETS)) {
validationException = addValidationError(
Expand All @@ -118,6 +146,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_11_0)) {
out.writeOptionalInt(datesAsEpochMillis);
}
if (out.getVersion().onOrAfter(Version.CURRENT)) { // TODO: 7.15
out.writeOptionalInt(interimResults);
}
}

@Override
Expand All @@ -133,6 +164,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (datesAsEpochMillis != null && (datesAsEpochMillis.equals(DEFAULT_DATES_AS_EPOCH_MILLIS) == false)) {
builder.field(TransformField.DATES_AS_EPOCH_MILLIS.getPreferredName(), datesAsEpochMillis > 0 ? true : false);
}
if (interimResults != null && (interimResults.equals(DEFAULT_INTERIM_RESULTS) == false)) {
builder.field(TransformField.INTERIM_RESULTS.getPreferredName(), interimResults > 0 ? true : false);
}
builder.endObject();
return builder;
}
Expand All @@ -149,12 +183,13 @@ public boolean equals(Object other) {
SettingsConfig that = (SettingsConfig) other;
return Objects.equals(maxPageSearchSize, that.maxPageSearchSize)
&& Objects.equals(docsPerSecond, that.docsPerSecond)
&& Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis);
&& Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis)
&& Objects.equals(interimResults, that.interimResults);
}

@Override
public int hashCode() {
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis);
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, interimResults);
}

@Override
Expand All @@ -170,6 +205,7 @@ public static class Builder {
private Integer maxPageSearchSize;
private Float docsPerSecond;
private Integer datesAsEpochMillis;
private Integer interimResults;

/**
* Default builder
Expand All @@ -185,6 +221,7 @@ public Builder(SettingsConfig base) {
this.maxPageSearchSize = base.maxPageSearchSize;
this.docsPerSecond = base.docsPerSecond;
this.datesAsEpochMillis = base.datesAsEpochMillis;
this.interimResults = base.interimResults;
}

/**
Expand Down Expand Up @@ -231,6 +268,19 @@ public Builder setDatesAsEpochMillis(Boolean datesAsEpochMillis) {
return this;
}

/**
* Whether to write interim results in transform checkpoints.
*
* An explicit `null` resets to default.
*
* @param interimResults true if interim results should be written.
* @return the {@link Builder} with interimResults set.
*/
public Builder setInterimResults(Boolean interimResults) {
this.interimResults = interimResults == null ? DEFAULT_INTERIM_RESULTS : interimResults ? 1 : 0;
return this;
}

/**
* Update settings according to given settings config.
*
Expand All @@ -253,12 +303,17 @@ public Builder update(SettingsConfig update) {
? null
: update.getDatesAsEpochMillisForUpdate();
}
if (update.getInterimResultsForUpdate() != null) {
this.interimResults = update.getInterimResultsForUpdate().equals(DEFAULT_INTERIM_RESULTS)
? null
: update.getInterimResultsForUpdate();
}

return this;
}

public SettingsConfig build() {
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis);
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, interimResults);
}
}
}
Loading

0 comments on commit 2a1f171

Please sign in to comment.