Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ML: update set_upgrade_mode, add logging #38372

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class Messages {
public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]";
public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time";
public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped";
public static final String JOB_AUDIT_DATAFEED_ISOLATED = "Datafeed isolated";
public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''";
public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}";
public static final String JOB_AUDIT_DELETED = "Job deleted";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public synchronized void stop() {
// datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds
// could fail if they send data to a dead autodetect process.
if (datafeedManager != null) {
datafeedManager.isolateAllDatafeedsOnThisNode();
datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
}
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
if (nativeController != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe
.sorted(Comparator.comparing(PersistentTask::getTaskName))
.collect(Collectors.toList());

logger.info("Un-assigning persistent tasks : " +
datafeedAndJobTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]")));

TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()),
r -> true,
Expand All @@ -287,6 +290,7 @@ private void isolateDatafeeds(PersistentTasksCustomMetaData tasksCustomMetaData,
ActionListener<List<IsolateDatafeedAction.Response>> listener) {
Set<String> datafeedsToIsolate = MlTasks.startedDatafeedIds(tasksCustomMetaData);

logger.info("Isolating datafeeds: " + datafeedsToIsolate.toString());
TypedChainTaskExecutor<IsolateDatafeedAction.Response> isolateDatafeedsExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class DatafeedManager {
private final DatafeedJobBuilder datafeedJobBuilder;
private final TaskRunner taskRunner = new TaskRunner();
private final AutodetectProcessManager autodetectProcessManager;
private volatile boolean isolated;

public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
Supplier<Long> currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) {
Expand Down Expand Up @@ -130,18 +129,20 @@ public void stopAllDatafeedsOnThisNode(String reason) {
* This is used before the JVM is killed. It differs from stopAllDatafeedsOnThisNode in that it leaves
* the datafeed tasks in the "started" state, so that they get restarted on a different node.
*/
public void isolateAllDatafeedsOnThisNode() {
isolated = true;
public void isolateAllDatafeedsOnThisNodeBeforeShutdown() {
Iterator<Holder> iter = runningDatafeedsOnThisNode.values().iterator();
while (iter.hasNext()) {
Holder next = iter.next();
next.isolateDatafeed();
next.setRelocating();
// TODO: it's not ideal that this "isolate" method does something a bit different to the one below
next.setNodeIsShuttingDown();
iter.remove();
}
}

public void isolateDatafeed(long allocationId) {
// This calls get() rather than remove() because we expect that the persistent task will
// be removed shortly afterwards and that operation needs to be able to find the holder
Holder holder = runningDatafeedsOnThisNode.get(allocationId);
if (holder != null) {
holder.isolateDatafeed();
Expand Down Expand Up @@ -195,7 +196,7 @@ protected void doRun() {
holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e);
return;
}
if (isolated == false) {
if (holder.isIsolated() == false) {
if (next != null) {
doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder);
} else {
Expand Down Expand Up @@ -298,7 +299,7 @@ public class Holder {
private final ProblemTracker problemTracker;
private final Consumer<Exception> finishHandler;
volatile Scheduler.Cancellable cancellable;
private volatile boolean isRelocating;
private volatile boolean isNodeShuttingDown;

Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob,
ProblemTracker problemTracker, Consumer<Exception> finishHandler) {
Expand All @@ -324,7 +325,7 @@ boolean isIsolated() {
}

public void stop(String source, TimeValue timeout, Exception e) {
if (isRelocating) {
if (isNodeShuttingDown) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this variable to make clearer what it's used for. In the case of isolating a datafeed prior to enabling upgrade mode the datafeed may relocate to a different node when upgrade mode is eventually cancelled. But as long as the node where this code is running remains up it needs to run the finishHandler.accept(e); line below otherwise the local task associated with the persistent task will linger and cause problems.

return;
}

Expand All @@ -344,11 +345,12 @@ public void stop(String source, TimeValue timeout, Exception e) {
if (cancellable != null) {
cancellable.cancel();
}
auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
auditor.info(datafeedJob.getJobId(),
Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED));
finishHandler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(),
acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired");
if (autoCloseJob) {
if (autoCloseJob && isIsolated() == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This highlights the massive complexity caused by separating jobs and datafeeds yet then trying to make them work together in a user friendly way.

There are 3 scenarios where we isolate datafeeds:

  1. Node shutting down - this is handled by the if at the very beginning of the method.
  2. Upgrade mode - in this case we most definitely do not want to close the associated job.
  3. Force-delete datafeed - this usually happens as part of the UI deleting a job and datafeed, so closing the job is unnecessary as the force-delete of the job that comes soon after the force-delete of the datafeed will do that. But theoretically a user could force-delete just the datafeed and not the job, and in this case it would make sense to auto-close the job. However, leaving the job open is the path that is least likely to do damage, so we'll go with that.

All this highlights the total lack of encapsulation between jobs and datafeeds. The fact that you need an intimate knowledge of exactly how every usage pattern plays out to make a small change to datafeeds shows that it would make things much easier if they were one and the same thing, as suggested by #34231. But since implementing #34231 would be a multi-month project maybe the best thing is just to get things working with the current separation and then try to change as little as possible in this area.

closeJob();
}
if (acquired) {
Expand All @@ -361,16 +363,18 @@ public void stop(String source, TimeValue timeout, Exception e) {
}

/**
* This stops a datafeed WITHOUT updating the corresponding persistent task. It must ONLY be called
* immediately prior to shutting down a node. Then the datafeed task can remain "started", and be
* relocated to a different node. Calling this method at any other time will ruin the datafeed.
* This stops a datafeed WITHOUT updating the corresponding persistent task. When called it
* will stop the datafeed from sending data to its job as quickly as possible. The caller
* must do something sensible with the corresponding persistent task. If the node is shutting
* down the task will automatically get reassigned. Otherwise the caller must take action to
* remove or reassign the persistent task, or the datafeed will be left in limbo.
*/
public void isolateDatafeed() {
datafeedJob.isolate();
}

public void setRelocating() {
isRelocating = true;
public void setNodeIsShuttingDown() {
isNodeShuttingDown = true;
}

private Long executeLookBack(long startTime, Long endTime) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ setup:
indices.create:
index: airline-data
body:
settings:
index:
number_of_replicas: 0
number_of_shards: 1
mappings:
properties:
time:
Expand Down Expand Up @@ -53,10 +57,9 @@ setup:
job_id: set-upgrade-mode-job

- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed
cluster.health:
index: airline-data
wait_for_status: green

---
teardown:
Expand All @@ -70,6 +73,10 @@ teardown:

---
"Test setting upgrade_mode to false when it is already false":
- do:
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed

- do:
ml.set_upgrade_mode:
enabled: false
Expand All @@ -92,6 +99,22 @@ teardown:

---
"Setting upgrade_mode to enabled":
- do:
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed

- do:
cat.tasks: {}
- match:
$body: |
/.+job.+/

- do:
cat.tasks: {}
- match:
$body: |
/.+datafeed.+/

- do:
ml.info: {}
- match: { upgrade_mode: false }
Expand Down Expand Up @@ -125,6 +148,22 @@ teardown:

---
"Setting upgrade mode to disabled from enabled":
- do:
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed

- do:
cat.tasks: {}
- match:
$body: |
/.+job.+/

- do:
cat.tasks: {}
- match:
$body: |
/.+datafeed.+/

- do:
ml.set_upgrade_mode:
enabled: true
Expand Down