Skip to content

Commit

Permalink
[ML] Adds progress reporting for transforms (#41278)
Browse files Browse the repository at this point in the history
* [ML] Adds progress reporting for transforms

* fixing after master merge

* Addressing PR comments

* removing unused imports

* Adjusting afterKey handling and percentage to be 100*

* Making sure it is a linked hashmap for serialization

* removing unused import

* addressing PR comments

* removing unused import

* simplifying code, only storing total docs and decrementing

* adjusting for rewrite

* removing initial progress gathering from executor
  • Loading branch information
benwtrent authored Apr 25, 2019
1 parent f26addc commit 9bf8b5a
Show file tree
Hide file tree
Showing 28 changed files with 1,284 additions and 264 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.dataframe.transforms;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class DataFrameTransformProgress {

public static final ParseField TOTAL_DOCS = new ParseField("total_docs");
public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
public static final ParseField PERCENT_COMPLETE = new ParseField("percent_complete");

public static final ConstructingObjectParser<DataFrameTransformProgress, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_progress",
true,
a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1], (Double)a[2]));

static {
PARSER.declareLong(constructorArg(), TOTAL_DOCS);
PARSER.declareLong(optionalConstructorArg(), DOCS_REMAINING);
PARSER.declareDouble(optionalConstructorArg(), PERCENT_COMPLETE);
}

public static DataFrameTransformProgress fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

private final long totalDocs;
private final long remainingDocs;
private final double percentComplete;

public DataFrameTransformProgress(long totalDocs, Long remainingDocs, double percentComplete) {
this.totalDocs = totalDocs;
this.remainingDocs = remainingDocs == null ? totalDocs : remainingDocs;
this.percentComplete = percentComplete;
}

public double getPercentComplete() {
return percentComplete;
}

public long getTotalDocs() {
return totalDocs;
}

public long getRemainingDocs() {
return remainingDocs;
}

@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}

if (other == null || other.getClass() != getClass()) {
return false;
}

DataFrameTransformProgress that = (DataFrameTransformProgress) other;
return Objects.equals(this.remainingDocs, that.remainingDocs)
&& Objects.equals(this.totalDocs, that.totalDocs)
&& Objects.equals(this.percentComplete, that.percentComplete);
}

@Override
public int hashCode(){
return Objects.hash(remainingDocs, totalDocs, percentComplete);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
Expand All @@ -44,33 +42,25 @@ public class DataFrameTransformState {
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField PROGRESS = new ParseField("progress");

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER =
new ConstructingObjectParser<>("data_frame_transform_state", true,
args -> new DataFrameTransformState((DataFrameTransformTaskState) args[0],
(IndexerState) args[1],
(HashMap<String, Object>) args[2],
(Map<String, Object>) args[2],
(long) args[3],
(String) args[4]));
(String) args[4],
(DataFrameTransformProgress) args[5]));

static {
PARSER.declareField(constructorArg(),
p -> DataFrameTransformTaskState.fromString(p.text()),
TASK_STATE,
ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ObjectParser.ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.START_OBJECT) {
return p.map();
}
if (p.currentToken() == XContentParser.Token.VALUE_NULL) {
return null;
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
PARSER.declareField(constructorArg(), p -> DataFrameTransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING);
PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), INDEXER_STATE, ValueType.STRING);
PARSER.declareField(optionalConstructorArg(), (p, c) -> p.mapOrdered(), CURRENT_POSITION, ValueType.OBJECT);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), CHECKPOINT);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress::fromXContent, PROGRESS, ValueType.OBJECT);
}

public static DataFrameTransformState fromXContent(XContentParser parser) throws IOException {
Expand All @@ -80,19 +70,22 @@ public static DataFrameTransformState fromXContent(XContentParser parser) throws
private final DataFrameTransformTaskState taskState;
private final IndexerState indexerState;
private final long checkpoint;
private final SortedMap<String, Object> currentPosition;
private final Map<String, Object> currentPosition;
private final String reason;
private final DataFrameTransformProgress progress;

public DataFrameTransformState(DataFrameTransformTaskState taskState,
IndexerState indexerState,
@Nullable Map<String, Object> position,
long checkpoint,
@Nullable String reason) {
@Nullable String reason,
@Nullable DataFrameTransformProgress progress) {
this.taskState = taskState;
this.indexerState = indexerState;
this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
this.currentPosition = position == null ? null : Collections.unmodifiableMap(new LinkedHashMap<>(position));
this.checkpoint = checkpoint;
this.reason = reason;
this.progress = progress;
}

public IndexerState getIndexerState() {
Expand All @@ -117,6 +110,11 @@ public String getReason() {
return reason;
}

@Nullable
public DataFrameTransformProgress getProgress() {
return progress;
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand All @@ -132,13 +130,14 @@ public boolean equals(Object other) {
return Objects.equals(this.taskState, that.taskState) &&
Objects.equals(this.indexerState, that.indexerState) &&
Objects.equals(this.currentPosition, that.currentPosition) &&
Objects.equals(this.progress, that.progress) &&
this.checkpoint == that.checkpoint &&
Objects.equals(this.reason, that.reason);
}

@Override
public int hashCode() {
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason);
return Objects.hash(taskState, indexerState, currentPosition, checkpoint, reason, progress);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static DataFrameTransformStateAndStats fromXContent(XContentParser parser
private final DataFrameTransformCheckpointingInfo checkpointingInfo;

public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats,
DataFrameTransformCheckpointingInfo checkpointingInfo) {
DataFrameTransformCheckpointingInfo checkpointingInfo) {
this.id = id;
this.transformState = state;
this.transformStats = stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

Expand Down Expand Up @@ -360,6 +361,10 @@ public void testGetStats() throws Exception {
assertEquals(DataFrameTransformTaskState.STARTED, stateAndStats.getTransformState().getTaskState());
assertEquals(null, stateAndStats.getTransformState().getReason());
assertNotEquals(zeroIndexerStats, stateAndStats.getTransformStats());
assertNotNull(stateAndStats.getTransformState().getProgress());
assertThat(stateAndStats.getTransformState().getProgress().getPercentComplete(), equalTo(100.0));
assertThat(stateAndStats.getTransformState().getProgress().getTotalDocs(), greaterThan(0L));
assertThat(stateAndStats.getTransformState().getProgress().getRemainingDocs(), equalTo(0L));
});
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.dataframe.transforms;

import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;

import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;

public class DataFrameTransformProgressTests extends ESTestCase {

public void testFromXContent() throws IOException {
xContentTester(this::createParser,
DataFrameTransformProgressTests::randomInstance,
DataFrameTransformProgressTests::toXContent,
DataFrameTransformProgress::fromXContent)
.supportsUnknownFields(true)
.randomFieldsExcludeFilter(field -> field.startsWith("state"))
.test();
}

public static DataFrameTransformProgress randomInstance() {
long totalDocs = randomNonNegativeLong();
Long docsRemaining = randomBoolean() ? null : randomLongBetween(0, totalDocs);
double percentComplete = totalDocs == 0 ? 1.0 : docsRemaining == null ? 0.0 : 100.0*(double)(totalDocs - docsRemaining)/totalDocs;
return new DataFrameTransformProgress(totalDocs, docsRemaining, percentComplete);
}

public static void toXContent(DataFrameTransformProgress progress, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field(DataFrameTransformProgress.TOTAL_DOCS.getPreferredName(), progress.getTotalDocs());
builder.field(DataFrameTransformProgress.DOCS_REMAINING.getPreferredName(), progress.getRemainingDocs());
builder.field(DataFrameTransformProgress.PERCENT_COMPLETE.getPreferredName(), progress.getPercentComplete());
builder.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public void testFromXContent() throws IOException {

public static DataFrameTransformStateAndStats randomInstance() {
return new DataFrameTransformStateAndStats(randomAlphaOfLength(10),
DataFrameTransformStateTests.randomDataFrameTransformState(),
DataFrameIndexerTransformStatsTests.randomStats(),
DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo());
DataFrameTransformStateTests.randomDataFrameTransformState(),
DataFrameIndexerTransformStatsTests.randomStats(),
DataFrameTransformCheckpointingInfoTests.randomDataFrameTransformCheckpointingInfo());
}

public static void toXContent(DataFrameTransformStateAndStats stateAndStats, XContentBuilder builder) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
Expand All @@ -46,7 +46,8 @@ public static DataFrameTransformState randomDataFrameTransformState() {
randomFrom(IndexerState.values()),
randomPositionMap(),
randomLongBetween(0,10),
randomBoolean() ? null : randomAlphaOfLength(10));
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : DataFrameTransformProgressTests.randomInstance());
}

public static void toXContent(DataFrameTransformState state, XContentBuilder builder) throws IOException {
Expand All @@ -60,6 +61,10 @@ public static void toXContent(DataFrameTransformState state, XContentBuilder bui
if (state.getReason() != null) {
builder.field("reason", state.getReason());
}
if (state.getProgress() != null) {
builder.field("progress");
DataFrameTransformProgressTests.toXContent(state.getProgress(), builder);
}
builder.endObject();
}

Expand All @@ -68,7 +73,7 @@ private static Map<String, Object> randomPositionMap() {
return null;
}
int numFields = randomIntBetween(1, 5);
Map<String, Object> position = new HashMap<>();
Map<String, Object> position = new LinkedHashMap<>();
for (int i = 0; i < numFields; i++) {
Object value;
if (randomBoolean()) {
Expand Down
Loading

0 comments on commit 9bf8b5a

Please sign in to comment.