Skip to content

Commit

Permalink
[Hotfix][Zeta] Fix task cannot be stopped when system is busy (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Aug 6, 2024
1 parent 9d56cc3 commit 73632ba
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.impl.NodeState;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
Expand Down Expand Up @@ -624,9 +625,12 @@ private void updateMetricsContextInImap() {
});
});
if (localMap.size() > 0) {
boolean lockedIMap = false;
try {
if (!metricsImap.tryLock(
Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5, TimeUnit.SECONDS)) {
lockedIMap =
metricsImap.tryLock(
Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5, TimeUnit.SECONDS);
if (!lockedIMap) {
logger.warning("try lock failed in update metrics");
return;
}
Expand All @@ -640,10 +644,16 @@ private void updateMetricsContextInImap() {
"The Imap acquisition failed due to the hazelcast node being offline or restarted, and will be retried next time",
e);
} finally {
try {
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
} catch (Throwable e) {
logger.warning("unlock imap failed in update metrics", e);
if (lockedIMap) {
boolean unLockedIMap = false;
while (!unLockedIMap) {
try {
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
unLockedIMap = true;
} catch (OperationTimeoutException e) {
logger.warning("unlock imap failed in update metrics", e);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@

import com.hazelcast.cluster.Address;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.datamodel.Tuple2;
Expand Down Expand Up @@ -674,8 +675,12 @@ public void removeMetricsContext(
if ((pipelineStatus.equals(PipelineStatus.FINISHED)
&& !checkpointManager.isPipelineSavePointEnd(pipelineLocation))
|| pipelineStatus.equals(PipelineStatus.CANCELED)) {

boolean lockedIMap = false;
try {
metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
lockedIMap = true;

HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
if (centralMap != null) {
Expand All @@ -693,7 +698,17 @@ public void removeMetricsContext(
metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
}
} finally {
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
if (lockedIMap) {
boolean unLockedIMap = false;
while (!unLockedIMap) {
try {
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
unLockedIMap = true;
} catch (OperationTimeoutException e) {
LOGGER.warning("unlock imap failed in update metrics", e);
}
}
}
}
}
}
Expand Down

0 comments on commit 73632ba

Please sign in to comment.