Skip to content

Commit

Permalink
[ML] Record the time of job state changes
Browse files Browse the repository at this point in the history
It's useful to know whether jobs have been failed for long
periods of time or just failed recently, as this may affect
the recommended course of action for recovery or cleanup.

To help with that, this change adds a last changed time to
the task state for ML jobs. The task state contains more
than just the job state. It also contains the allocation ID
and assignment reason. Therefore it is not a perfect record
of when a job entered the failed state. However, we would
expect that jobs will not be moved around after failing, so
it's pretty good.

Due to the distinction between job state and task state this
new information is not exposed in the standard stats responses.
It is available to developers looking for detailed support
information within the task states in cluster state.
  • Loading branch information
droberts195 committed Nov 15, 2023
1 parent b8b1d47 commit 58ee256
Show file tree
Hide file tree
Showing 27 changed files with 272 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ static TransportVersion def(int id) {
public static final TransportVersion REPO_ANALYSIS_REGISTER_OP_COUNT_ADDED = def(8_534_00_0);
public static final TransportVersion ML_TRAINED_MODEL_PREFIX_STRINGS_ADDED = def(8_535_00_0);
public static final TransportVersion COUNTED_KEYWORD_ADDED = def(8_536_00_0);
public static final TransportVersion ML_STATE_CHANGE_TIMESTAMPS = def(8_537_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,26 @@ public void setTaskId(String taskId) {
this.taskId = taskId;
}

public String getTaskId() {
return taskId;
}

public void setAllocationId(long allocationId) {
this.allocationId = allocationId;
}

public long getAllocationId() {
return allocationId;
}

public void setState(PersistentTaskState state) {
this.state = state;
}

public PersistentTaskState getState() {
return state;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskState;
import org.elasticsearch.xpack.core.ml.utils.MemoryTrackedTaskState;

import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
Expand Down Expand Up @@ -194,6 +195,17 @@ public static JobState getJobStateModifiedForReassignments(@Nullable PersistentT
return jobState;
}

public static Instant getLastJobTaskStateChangeTime(String jobId, @Nullable PersistentTasksCustomMetadata tasks) {
PersistentTasksCustomMetadata.PersistentTask<?> task = getJobTask(jobId, tasks);
if (task != null) {
JobTaskState jobTaskState = (JobTaskState) task.getState();
if (jobTaskState != null) {
return jobTaskState.getLastStateChangeTime();
}
}
return null;
}

public static SnapshotUpgradeState getSnapshotUpgradeState(
String jobId,
String snapshotId,
Expand Down Expand Up @@ -260,6 +272,17 @@ public static DataFrameAnalyticsState getDataFrameAnalyticsState(@Nullable Persi
return state;
}

public static Instant getLastDataFrameAnalyticsTaskStateChangeTime(String analyticsId, @Nullable PersistentTasksCustomMetadata tasks) {
PersistentTasksCustomMetadata.PersistentTask<?> task = getDataFrameAnalyticsTask(analyticsId, tasks);
if (task != null) {
DataFrameAnalyticsTaskState taskState = (DataFrameAnalyticsTaskState) task.getState();
if (taskState != null) {
return taskState.getLastStateChangeTime();
}
}
return null;
}

/**
* The job Ids of anomaly detector job tasks.
* All anomaly detector jobs are returned regardless of the status of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,57 @@
*/
package org.elasticsearch.xpack.core.ml.dataframe;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.utils.MlTaskState;

import java.io.IOException;
import java.time.Instant;
import java.util.Objects;

public class DataFrameAnalyticsTaskState implements PersistentTaskState {
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class DataFrameAnalyticsTaskState implements PersistentTaskState, MlTaskState {

public static final String NAME = MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME;

private static ParseField STATE = new ParseField("state");
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");
private static ParseField REASON = new ParseField("reason");
private static final ParseField STATE = new ParseField("state");
private static final ParseField ALLOCATION_ID = new ParseField("allocation_id");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField LAST_STATE_CHANGE_TIME = new ParseField("last_state_change_time");

private final DataFrameAnalyticsState state;
private final long allocationId;
private final String reason;
private final Instant lastStateChangeTime;

private static final ConstructingObjectParser<DataFrameAnalyticsTaskState, Void> PARSER = new ConstructingObjectParser<>(
NAME,
true,
a -> new DataFrameAnalyticsTaskState((DataFrameAnalyticsState) a[0], (long) a[1], (String) a[2])
a -> new DataFrameAnalyticsTaskState((DataFrameAnalyticsState) a[0], (long) a[1], (String) a[2], (Instant) a[3])
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameAnalyticsState::fromString, STATE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), ALLOCATION_ID);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON);
PARSER.declareField(
optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, LAST_STATE_CHANGE_TIME.getPreferredName()),
LAST_STATE_CHANGE_TIME,
ObjectParser.ValueType.VALUE
);
}

public static DataFrameAnalyticsTaskState fromXContent(XContentParser parser) {
Expand All @@ -52,27 +67,49 @@ public static DataFrameAnalyticsTaskState fromXContent(XContentParser parser) {
}
}

public DataFrameAnalyticsTaskState(DataFrameAnalyticsState state, long allocationId, @Nullable String reason) {
public DataFrameAnalyticsTaskState(
DataFrameAnalyticsState state,
long allocationId,
@Nullable String reason,
@Nullable Instant lastStateChangeTime
) {
this.state = Objects.requireNonNull(state);
this.allocationId = allocationId;
this.reason = reason;
// Round to millisecond to avoid serialization round trip differences
this.lastStateChangeTime = (lastStateChangeTime != null) ? Instant.ofEpochMilli(lastStateChangeTime.toEpochMilli()) : null;
}

public DataFrameAnalyticsTaskState(StreamInput in) throws IOException {
this.state = DataFrameAnalyticsState.fromStream(in);
this.allocationId = in.readLong();
this.reason = in.readOptionalString();
if (in.getTransportVersion().onOrAfter(TransportVersions.ML_STATE_CHANGE_TIMESTAMPS)) {
lastStateChangeTime = in.readOptionalInstant();
} else {
lastStateChangeTime = null;
}
}

public DataFrameAnalyticsState getState() {
return state;
}

public long getAllocationId() {
return allocationId;
}

@Nullable
public String getReason() {
return reason;
}

@Override
@Nullable
public Instant getLastStateChangeTime() {
return lastStateChangeTime;
}

public boolean isStatusStale(PersistentTasksCustomMetadata.PersistentTask<?> task) {
return allocationId != task.getAllocationId();
}
Expand All @@ -87,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException {
state.writeTo(out);
out.writeLong(allocationId);
out.writeOptionalString(reason);
if (out.getTransportVersion().onOrAfter(TransportVersions.ML_STATE_CHANGE_TIMESTAMPS)) {
out.writeOptionalInstant(lastStateChangeTime);
}
}

@Override
Expand All @@ -97,6 +137,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (reason != null) {
builder.field(REASON.getPreferredName(), reason);
}
if (lastStateChangeTime != null) {
builder.timeField(
LAST_STATE_CHANGE_TIME.getPreferredName(),
LAST_STATE_CHANGE_TIME.getPreferredName() + "_string",
lastStateChangeTime.toEpochMilli()
);
}
builder.endObject();
return builder;
}
Expand All @@ -106,11 +153,14 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DataFrameAnalyticsTaskState that = (DataFrameAnalyticsTaskState) o;
return allocationId == that.allocationId && state == that.state && Objects.equals(reason, that.reason);
return allocationId == that.allocationId
&& state == that.state
&& Objects.equals(reason, that.reason)
&& Objects.equals(lastStateChangeTime, that.lastStateChangeTime);
}

@Override
public int hashCode() {
return Objects.hash(state, allocationId, reason);
return Objects.hash(state, allocationId, reason, lastStateChangeTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,53 @@
*/
package org.elasticsearch.xpack.core.ml.job.config;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.utils.MlTaskState;

import java.io.IOException;
import java.time.Instant;
import java.util.Objects;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class JobTaskState implements PersistentTaskState {
public class JobTaskState implements PersistentTaskState, MlTaskState {

public static final String NAME = MlTasks.JOB_TASK_NAME;

private static ParseField STATE = new ParseField("state");
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");
private static ParseField REASON = new ParseField("reason");
private static final ParseField STATE = new ParseField("state");
private static final ParseField ALLOCATION_ID = new ParseField("allocation_id");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField LAST_STATE_CHANGE_TIME = new ParseField("last_state_change_time");

private static final ConstructingObjectParser<JobTaskState, Void> PARSER = new ConstructingObjectParser<>(
NAME,
true,
args -> new JobTaskState((JobState) args[0], (Long) args[1], (String) args[2])
args -> new JobTaskState((JobState) args[0], (Long) args[1], (String) args[2], (Instant) args[3])
);

static {
PARSER.declareString(constructorArg(), JobState::fromString, STATE);
PARSER.declareLong(constructorArg(), ALLOCATION_ID);
PARSER.declareString(optionalConstructorArg(), REASON);
PARSER.declareField(
optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, LAST_STATE_CHANGE_TIME.getPreferredName()),
LAST_STATE_CHANGE_TIME,
ObjectParser.ValueType.VALUE
);
}

public static JobTaskState fromXContent(XContentParser parser) {
Expand All @@ -54,28 +66,46 @@ public static JobTaskState fromXContent(XContentParser parser) {
private final JobState state;
private final long allocationId;
private final String reason;
private final Instant lastStateChangeTime;

public JobTaskState(JobState state, long allocationId, @Nullable String reason) {
public JobTaskState(JobState state, long allocationId, @Nullable String reason, @Nullable Instant lastStateChangeTime) {
this.state = Objects.requireNonNull(state);
this.allocationId = allocationId;
this.reason = reason;
// Round to millisecond to avoid serialization round trip differences
this.lastStateChangeTime = (lastStateChangeTime != null) ? Instant.ofEpochMilli(lastStateChangeTime.toEpochMilli()) : null;
}

public JobTaskState(StreamInput in) throws IOException {
state = JobState.fromStream(in);
allocationId = in.readLong();
reason = in.readOptionalString();
if (in.getTransportVersion().onOrAfter(TransportVersions.ML_STATE_CHANGE_TIMESTAMPS)) {
lastStateChangeTime = in.readOptionalInstant();
} else {
lastStateChangeTime = null;
}
}

public JobState getState() {
return state;
}

public long getAllocationId() {
return allocationId;
}

@Nullable
public String getReason() {
return reason;
}

@Override
@Nullable
public Instant getLastStateChangeTime() {
return lastStateChangeTime;
}

/**
* The job state stores the allocation ID at the time it was last set.
* This method compares the allocation ID in the state with the allocation
Expand All @@ -101,6 +131,9 @@ public void writeTo(StreamOutput out) throws IOException {
state.writeTo(out);
out.writeLong(allocationId);
out.writeOptionalString(reason);
if (out.getTransportVersion().onOrAfter(TransportVersions.ML_STATE_CHANGE_TIMESTAMPS)) {
out.writeOptionalInstant(lastStateChangeTime);
}
}

@Override
Expand All @@ -111,6 +144,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (reason != null) {
builder.field(REASON.getPreferredName(), reason);
}
if (lastStateChangeTime != null) {
builder.timeField(
LAST_STATE_CHANGE_TIME.getPreferredName(),
LAST_STATE_CHANGE_TIME.getPreferredName() + "_string",
lastStateChangeTime.toEpochMilli()
);
}
builder.endObject();
return builder;
}
Expand All @@ -120,11 +160,14 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JobTaskState that = (JobTaskState) o;
return state == that.state && Objects.equals(allocationId, that.allocationId) && Objects.equals(reason, that.reason);
return state == that.state
&& Objects.equals(allocationId, that.allocationId)
&& Objects.equals(reason, that.reason)
&& Objects.equals(lastStateChangeTime, that.lastStateChangeTime);
}

@Override
public int hashCode() {
return Objects.hash(state, allocationId, reason);
return Objects.hash(state, allocationId, reason, lastStateChangeTime);
}
}
Loading

0 comments on commit 58ee256

Please sign in to comment.