Skip to content

Commit

Permalink
Revert "IGNITE-16916 Job cancellation routine improvement: configurab…
Browse files Browse the repository at this point in the history
…le graceful stop period before interrupting job worker thread - Fixes apache#10005."

This reverts commit 7357847.
  • Loading branch information
sergey-chugunov-1985 committed May 5, 2022
1 parent de436d6 commit d1b236b
Show file tree
Hide file tree
Showing 36 changed files with 209 additions and 932 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2897,7 +2897,7 @@ public synchronized void submit(GridFutureAdapter notificationFut, Runnable cmd)
if (!isInterruptedException)
U.error(log, "Exception in discovery notifier worker thread.", t);

if (!isInterruptedException || !isCancelled.get()) {
if (!isInterruptedException || !isCancelled) {
FailureType type = t instanceof OutOfMemoryError ? CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION;

ctx.failure().process(new FailureContext(type, t));
Expand Down Expand Up @@ -3077,7 +3077,7 @@ void addEvent(NotificationEvent notificationEvt) {
onIdle();
}
catch (InterruptedException e) {
if (!isCancelled.get())
if (!isCancelled)
ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, e));

throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,6 @@ private class CleanupWorker extends GridWorker {
cctx.exchange().affinityReadyFuture(AffinityTopologyVersion.ZERO).get();
}
catch (IgniteCheckedException ex) {
if (cctx.kernalContext().isStopping()) {
isCancelled.set(true);

return; // Node is stopped before affinity has prepared.
}

throw new IgniteException("Failed to wait for initialization topology [err="
+ ex.getMessage() + ']', ex);
}
Expand Down Expand Up @@ -220,13 +214,13 @@ private class CleanupWorker extends GridWorker {
}
catch (Throwable t) {
if (X.hasCause(t, NodeStoppingException.class)) {
isCancelled.set(true); // Treat node stopping as valid worker cancellation.
isCancelled = true; // Treat node stopping as valid worker cancellation.

return;
}

if (!(t instanceof IgniteInterruptedCheckedException || t instanceof InterruptedException)) {
if (isCancelled.get())
if (isCancelled)
return;

err = t;
Expand All @@ -235,7 +229,7 @@ private class CleanupWorker extends GridWorker {
throw t;
}
finally {
if (err == null && !isCancelled.get())
if (err == null && !isCancelled)
err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly");

if (err instanceof OutOfMemoryError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ synchronized void startTaskAsync(int typeId, int typeVer) {
body0();
}
catch (InterruptedException e) {
if (!isCancelled.get()) {
if (!isCancelled) {
ctx.failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, e));

throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ private IgniteThreadPoolExecutor initializeCheckpointPool() {
throw t;
}
finally {
if (err == null && !(isCancelled.get()))
if (err == null && !(isCancelled))
err = new IllegalStateException("Thread is terminated unexpectedly: " + name());

if (err instanceof OutOfMemoryError)
Expand Down Expand Up @@ -826,7 +826,7 @@ private void waitCheckpointEvent() {
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();

isCancelled.set(true);
isCancelled = true;
}
}

Expand Down Expand Up @@ -887,7 +887,7 @@ private void startCheckpointProgress() {
log.debug("Cancelling grid runnable: " + this);

// Do not interrupt runner thread.
isCancelled.set(true);
isCancelled = true;

synchronized (this) {
notifyAll();
Expand Down Expand Up @@ -915,7 +915,7 @@ public IgniteInternalFuture<Void> enableCheckpoints(boolean enable) {
public void shutdownNow() {
shutdownNow = true;

if (!isCancelled.get())
if (!isCancelled)
cancel();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,7 @@ private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() throws IgniteChecke
*/
private void shutdown() throws IgniteInterruptedCheckedException {
synchronized (this) {
isCancelled.set(true);
isCancelled = true;

notifyAll();
}
Expand Down Expand Up @@ -1985,7 +1985,7 @@ private void shutdown() throws IgniteInterruptedCheckedException {
Thread.currentThread().interrupt();

synchronized (this) {
isCancelled.set(true);
isCancelled = true;
}
}
catch (Throwable t) {
Expand Down Expand Up @@ -2141,7 +2141,7 @@ private void allocateRemainingFiles() throws StorageException {
public void restart() {
assert runner() == null : "FileArchiver is still running";

isCancelled.set(false);
isCancelled = false;

new IgniteThread(archiver).start();
}
Expand Down Expand Up @@ -2249,7 +2249,7 @@ private class FileCompressorWorker extends GridWorker {
void restart() {
assert runner() == null : "FileCompressorWorker is still running.";

isCancelled.set(false);
isCancelled = false;

new IgniteThread(this).start();
}
Expand Down Expand Up @@ -2530,7 +2530,7 @@ private class FileDecompressor extends GridWorker {
U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " +
"[tmp=" + unzipTmp + ", raw=" + unzip + "]", e);
}
else if (!isCancelled.get()) {
else if (!isCancelled) {
ex = new IgniteCheckedException("Error during WAL segment decompression [segmentIdx=" +
segmentToDecompress + "]", e);
}
Expand All @@ -2549,14 +2549,14 @@ else if (!isCancelled.get()) {
catch (InterruptedException e) {
Thread.currentThread().interrupt();

if (!isCancelled.get())
if (!isCancelled)
err = e;
}
catch (Throwable t) {
err = t;
}
finally {
if (err == null && !isCancelled.get())
if (err == null && !isCancelled)
err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");

if (err instanceof OutOfMemoryError)
Expand Down Expand Up @@ -2605,7 +2605,7 @@ private void shutdown() {
void restart() {
assert runner() == null : "FileDecompressor is still running.";

isCancelled.set(false);
isCancelled = false;

new IgniteThread(this).start();
}
Expand Down Expand Up @@ -3292,7 +3292,7 @@ public FileCleaner(IgniteLogger log) {
catch (IgniteInterruptedCheckedException e) {
Thread.currentThread().interrupt();

isCancelled.set(true);
isCancelled = true;
}
catch (Throwable t) {
err = t;
Expand All @@ -3314,7 +3314,7 @@ else if (err != null)
* @throws IgniteInterruptedCheckedException If failed to wait for worker shutdown.
*/
private void shutdown() throws IgniteInterruptedCheckedException {
isCancelled.set(true);
isCancelled = true;

U.join(this);
}
Expand All @@ -3325,7 +3325,7 @@ private void shutdown() throws IgniteInterruptedCheckedException {
public void restart() {
assert runner() == null : "FileCleaner is still running";

isCancelled.set(false);
isCancelled = false;

new IgniteThread(this).start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ else if (pos == FILE_FORCE)

unparkWaiters(MAX_VALUE);

if (err == null && !isCancelled.get())
if (err == null && !isCancelled)
err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");

if (err instanceof OutOfMemoryError)
Expand Down Expand Up @@ -592,7 +592,7 @@ private void writeBuffer(long pos, ByteBuffer buf) throws StorageException, Igni
public void restart() {
assert runner() == null : "WALWriter is still running.";

isCancelled.set(false);
isCancelled = false;

new IgniteThread(this).start();
}
Expand Down Expand Up @@ -649,7 +649,7 @@ private void shutdown() throws IgniteInterruptedCheckedException {
public void restart() {
assert runner() == null : "WalSegmentSyncer is running.";

isCancelled.set(false);
isCancelled = false;

new IgniteThread(walSegmentSyncWorker).start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.ACTUALIZE;
import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.CLUSTER_WIDE_UPDATE;
Expand Down Expand Up @@ -125,7 +124,7 @@ private synchronized void switchCurrentActionTo(AllowableAction to) {
* @param propKey Key of specific property.
* @return Property key for meta storage.
*/
public static String toMetaStorageKey(String propKey) {
private static String toMetaStorageKey(String propKey) {
return DIST_CONF_PREFIX + propKey;
}

Expand Down Expand Up @@ -170,7 +169,7 @@ public List<DistributedChangeableProperty<Serializable>> properties() {
* @param name Property name.
* @return Public property.
*/
public @Nullable DistributedChangeableProperty<Serializable> property(String name) {
public DistributedChangeableProperty<Serializable> property(String name) {
DistributedChangeableProperty<?> p = props.get(name);

if (!(p instanceof DistributedChangeableProperty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSnapshot;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
Expand All @@ -88,7 +87,6 @@
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
Expand All @@ -115,11 +113,9 @@
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.makeUpdateListener;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty.detachedLongProperty;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.CPU_LOAD;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
Expand Down Expand Up @@ -170,12 +166,6 @@ public class GridJobProcessor extends GridProcessorAdapter {
/** Total jobs waiting time metric name. */
public static final String WAITING_TIME = "WaitingTime";

/**
* Distributed property that defines the timeout for interrupting the
* {@link GridJobWorker worker} after {@link GridJobWorker#cancel() cancellation} in mills.
*/
public static final String COMPUTE_JOB_WORKER_INTERRUPT_TIMEOUT = "computeJobWorkerInterruptTimeout";

/** */
private final Marshaller marsh;

Expand Down Expand Up @@ -322,10 +312,6 @@ public class GridJobProcessor extends GridProcessorAdapter {
*/
@Nullable private final String jobPriAttrKey;

/** Timeout interrupt {@link GridJobWorker workers} after {@link GridJobWorker#cancel cancel} im mills. */
private final DistributedLongProperty computeJobWorkerInterruptTimeout =
detachedLongProperty(COMPUTE_JOB_WORKER_INTERRUPT_TIMEOUT);

/**
* @param ctx Kernal context.
*/
Expand Down Expand Up @@ -392,15 +378,6 @@ public GridJobProcessor(GridKernalContext ctx) {
taskPriAttrKey = null;
jobPriAttrKey = null;
}

ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher -> {
computeJobWorkerInterruptTimeout.addListener(makeUpdateListener(
"Compute job parameter '%s' was changed from '%s' to '%s'",
log
));

dispatcher.registerProperty(computeJobWorkerInterruptTimeout);
});
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -901,7 +878,7 @@ private boolean removeFromPassive(GridJobWorker job) {
* In most cases this method should be called from main read lock
* to avoid jobs activation after node stop has started.
*/
public void handleCollisions() {
private void handleCollisions() {
assert !jobAlwaysActivate;

if (handlingCollision.get()) {
Expand Down Expand Up @@ -1350,9 +1327,7 @@ public void processJobExecuteRequest(ClusterNode node, final GridJobExecuteReque
holdLsnr,
partsReservation,
req.getTopVer(),
req.executorName(),
this::computeJobWorkerInterruptTimeout
);
req.executorName());

jobCtx.job(job);

Expand Down Expand Up @@ -2439,11 +2414,4 @@ else if (jobId == null)
else
return w -> sesId.equals(w.getSession().getId()) && jobId.equals(w.getJobId());
}

/**
* @return Interruption timeout of {@link GridJobWorker workers} (in millis) after {@link GridWorker#cancel cancel} is called.
*/
public long computeJobWorkerInterruptTimeout() {
return computeJobWorkerInterruptTimeout.getOrDefault(ctx.config().getFailureDetectionTimeout());
}
}
Loading

0 comments on commit d1b236b

Please sign in to comment.