-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ML: update set_upgrade_mode, add logging #38372
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,7 +64,6 @@ public class DatafeedManager { | |
private final DatafeedJobBuilder datafeedJobBuilder; | ||
private final TaskRunner taskRunner = new TaskRunner(); | ||
private final AutodetectProcessManager autodetectProcessManager; | ||
private volatile boolean isolated; | ||
|
||
public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder, | ||
Supplier<Long> currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) { | ||
|
@@ -130,18 +129,20 @@ public void stopAllDatafeedsOnThisNode(String reason) { | |
* This is used before the JVM is killed. It differs from stopAllDatafeedsOnThisNode in that it leaves | ||
* the datafeed tasks in the "started" state, so that they get restarted on a different node. | ||
*/ | ||
public void isolateAllDatafeedsOnThisNode() { | ||
isolated = true; | ||
public void isolateAllDatafeedsOnThisNodeBeforeShutdown() { | ||
Iterator<Holder> iter = runningDatafeedsOnThisNode.values().iterator(); | ||
while (iter.hasNext()) { | ||
Holder next = iter.next(); | ||
next.isolateDatafeed(); | ||
next.setRelocating(); | ||
// TODO: it's not ideal that this "isolate" method does something a bit different to the one below | ||
next.setNodeIsShuttingDown(); | ||
iter.remove(); | ||
} | ||
} | ||
|
||
public void isolateDatafeed(long allocationId) { | ||
// This calls get() rather than remove() because we expect that the persistent task will | ||
// be removed shortly afterwards and that operation needs to be able to find the holder | ||
Holder holder = runningDatafeedsOnThisNode.get(allocationId); | ||
if (holder != null) { | ||
holder.isolateDatafeed(); | ||
|
@@ -195,7 +196,7 @@ protected void doRun() { | |
holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e); | ||
return; | ||
} | ||
if (isolated == false) { | ||
if (holder.isIsolated() == false) { | ||
if (next != null) { | ||
doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder); | ||
} else { | ||
|
@@ -298,7 +299,7 @@ public class Holder { | |
private final ProblemTracker problemTracker; | ||
private final Consumer<Exception> finishHandler; | ||
volatile Scheduler.Cancellable cancellable; | ||
private volatile boolean isRelocating; | ||
private volatile boolean isNodeShuttingDown; | ||
|
||
Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob, | ||
ProblemTracker problemTracker, Consumer<Exception> finishHandler) { | ||
|
@@ -324,7 +325,7 @@ boolean isIsolated() { | |
} | ||
|
||
public void stop(String source, TimeValue timeout, Exception e) { | ||
if (isRelocating) { | ||
if (isNodeShuttingDown) { | ||
return; | ||
} | ||
|
||
|
@@ -344,11 +345,12 @@ public void stop(String source, TimeValue timeout, Exception e) { | |
if (cancellable != null) { | ||
cancellable.cancel(); | ||
} | ||
auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); | ||
auditor.info(datafeedJob.getJobId(), | ||
Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED)); | ||
finishHandler.accept(e); | ||
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(), | ||
acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"); | ||
if (autoCloseJob) { | ||
if (autoCloseJob && isIsolated() == false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This highlights the massive complexity caused by separating jobs and datafeeds yet then trying to make them work together in a user friendly way. There are 3 scenarios where we isolate datafeeds:
All this highlights the total lack of encapsulation between jobs and datafeeds. The fact that you need an intimate knowledge of exactly how every usage pattern plays out to make a small change to datafeeds shows that it would make things much easier if they were one and the same thing, as suggested by #34231. But since implementing #34231 would be a multi-month project maybe the best thing is just to get things working with the current separation and then try to change as little as possible in this area. |
||
closeJob(); | ||
} | ||
if (acquired) { | ||
|
@@ -361,16 +363,18 @@ public void stop(String source, TimeValue timeout, Exception e) { | |
} | ||
|
||
/** | ||
* This stops a datafeed WITHOUT updating the corresponding persistent task. It must ONLY be called | ||
* immediately prior to shutting down a node. Then the datafeed task can remain "started", and be | ||
* relocated to a different node. Calling this method at any other time will ruin the datafeed. | ||
* This stops a datafeed WITHOUT updating the corresponding persistent task. When called it | ||
* will stop the datafeed from sending data to its job as quickly as possible. The caller | ||
* must do something sensible with the corresponding persistent task. If the node is shutting | ||
* down the task will automatically get reassigned. Otherwise the caller must take action to | ||
* remove or reassign the persistent task, or the datafeed will be left in limbo. | ||
*/ | ||
public void isolateDatafeed() { | ||
datafeedJob.isolate(); | ||
} | ||
|
||
public void setRelocating() { | ||
isRelocating = true; | ||
public void setNodeIsShuttingDown() { | ||
isNodeShuttingDown = true; | ||
} | ||
|
||
private Long executeLookBack(long startTime, Long endTime) throws Exception { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed this variable to make clearer what it's used for. In the case of isolating a datafeed prior to enabling upgrade mode the datafeed may relocate to a different node when upgrade mode is eventually cancelled. But as long as the node where this code is running remains up it needs to run the
finishHandler.accept(e);
line below otherwise the local task associated with the persistent task will linger and cause problems.