Skip to content

Commit

Permalink
[hotfix][engine][checkpoint] fix checkpoint error in master down (#3140)
Browse files Browse the repository at this point in the history
* [hotfix][checkpoint] checkpoint barrier aligend parallel exception

* [hotfix][engine][checkpoint] fix checkpoint error in master down

* [checkpoint] notify checkpoint timeout
  • Loading branch information
ashulin authored Oct 19, 2022
1 parent 4cecedb commit 63dd9d0
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private void scheduleTriggerPendingCheckpoint(long delayMills) {
scheduler.schedule(this::tryTriggerPendingCheckpoint, delayMills, TimeUnit.MILLISECONDS);
}

private void tryTriggerPendingCheckpoint() {
protected void tryTriggerPendingCheckpoint() {
synchronized (lock) {
final long currentTimestamp = Instant.now().toEpochMilli();
if (currentTimestamp - latestTriggerTimestamp.get() >= coordinatorConfig.getCheckpointInterval() &&
Expand Down Expand Up @@ -282,7 +282,7 @@ private void startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint>
if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && !pendingCheckpoint.isFullyAcknowledged()) {
if (tolerableFailureCheckpoints-- <= 0) {
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_EXPIRED);
// TODO: notify job master to restore the pipeline.
checkpointManager.handleCheckpointTimeout(pipelineId);
}
}
}, coordinatorConfig.getCheckpointTimeout(),
Expand Down Expand Up @@ -427,6 +427,7 @@ public void completePendingCheckpoint(CompletedCheckpoint completedCheckpoint) {
} catch (IOException | CheckpointStorageException e) {
sneakyThrow(e);
}
LOG.info("pending checkpoint({}/{}@{}) notify finished!", completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId());
InvocationFuture<?>[] invocationFutures = notifyCheckpointCompleted(checkpointId);
CompletableFuture.allOf(invocationFutures).join();
// TODO: notifyCheckpointCompleted fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

import com.hazelcast.cluster.Address;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
Expand All @@ -67,8 +67,6 @@ public class CheckpointManager {

private final NodeEngine nodeEngine;

private final Map<Long, Address> subtaskWithAddresses;

/**
* key: the pipeline id of the job;
* <br> value: the checkpoint coordinator of the pipeline;
Expand All @@ -77,13 +75,15 @@ public class CheckpointManager {

private final CheckpointStorage checkpointStorage;

private final JobMaster jobMaster;
public CheckpointManager(long jobId,
NodeEngine nodeEngine,
JobMaster jobMaster,
Map<Integer, CheckpointPlan> checkpointPlanMap,
CheckpointConfig checkpointConfig) throws CheckpointStorageException {
this.jobId = jobId;
this.nodeEngine = nodeEngine;
this.subtaskWithAddresses = new HashMap<>();
this.jobMaster = jobMaster;
this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, checkpointConfig.getStorage().getStorage())
.create(new HashMap<>());
IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
Expand Down Expand Up @@ -131,6 +131,14 @@ public PassiveCompletableFuture<CompletedCheckpoint> triggerSavepoint(int pipeli
return getCheckpointCoordinator(pipelineId).startSavepoint();
}

public void reportedPipelineRunning(int pipelineId) {
getCheckpointCoordinator(pipelineId).tryTriggerPendingCheckpoint();
}

protected void handleCheckpointTimeout(int pipelineId) {
jobMaster.handleCheckpointTimeout(pipelineId);
}

private CheckpointCoordinator getCheckpointCoordinator(TaskLocation taskLocation) {
return getCheckpointCoordinator(taskLocation.getPipelineId());
}
Expand All @@ -147,9 +155,9 @@ private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
* Called by the {@link Task}.
* <br> used by Task to report the {@link SeaTunnelTaskState} of the state machine.
*/
public void reportedTask(TaskReportStatusOperation reportStatusOperation, Address address) {
public void reportedTask(TaskReportStatusOperation reportStatusOperation) {
// task address may change during restore.
subtaskWithAddresses.put(reportStatusOperation.getLocation().getTaskID(), address);
log.debug("reported task({}) status{}", reportStatusOperation.getLocation().getTaskID(), reportStatusOperation.getStatus());
getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation);
}

Expand Down Expand Up @@ -211,6 +219,6 @@ public void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
}

protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, subtaskWithAddresses.get(operation.getTaskLocation().getTaskID()));
return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ public void run() {
((SeaTunnelServer) getService())
.getCoordinatorService().getJobMaster(location.getJobId())
.getCheckpointManager()
.reportedTask(this, getCallerAddress());
.reportedTask(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void init(long initializationTimestamp) throws Exception {
this.checkpointManager = new CheckpointManager(
jobImmutableInformation.getJobId(),
nodeEngine,
this,
planTuple.f1(),
checkpointConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand All @@ -94,9 +94,9 @@ public abstract class SeaTunnelTask extends AbstractTask {

protected List<CompletableFuture<Void>> flowFutures;

protected final Map<Long, List<ActionSubtaskState>> checkpointStates = new HashMap<>();
protected final Map<Long, List<ActionSubtaskState>> checkpointStates = new ConcurrentHashMap<>();

private final Map<Long, Integer> cycleAcks = new HashMap<>();
private final Map<Long, Integer> cycleAcks = new ConcurrentHashMap<>();

protected int indexID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -95,7 +94,7 @@ public SinkAggregatedCommitterTask(long jobID, TaskLocation taskID, SinkAction<?
public void init() throws Exception {
super.init();
currState = INIT;
this.checkpointBarrierCounter = new HashMap<>();
this.checkpointBarrierCounter = new ConcurrentHashMap<>();
this.commitInfoCache = new ConcurrentHashMap<>();
this.writerAddressMap = new ConcurrentHashMap<>();
this.checkpointCommitInfoMap = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException
CheckpointManager checkpointManager = new CheckpointManager(
jobId,
nodeEngine,
null,
planMap,
new CheckpointConfig());
Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
Expand Down

0 comments on commit 63dd9d0

Please sign in to comment.