Skip to content

Commit

Permalink
[hotfix][engine][checkpoint] fix checkpoint error in master down
Browse files Browse the repository at this point in the history
  • Loading branch information
ashulin committed Oct 19, 2022
1 parent c1cb7ee commit 22d42e2
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private void scheduleTriggerPendingCheckpoint(long delayMills) {
scheduler.schedule(this::tryTriggerPendingCheckpoint, delayMills, TimeUnit.MILLISECONDS);
}

private void tryTriggerPendingCheckpoint() {
protected void tryTriggerPendingCheckpoint() {
synchronized (lock) {
final long currentTimestamp = Instant.now().toEpochMilli();
if (currentTimestamp - latestTriggerTimestamp.get() >= coordinatorConfig.getCheckpointInterval() &&
Expand Down Expand Up @@ -427,6 +427,7 @@ public void completePendingCheckpoint(CompletedCheckpoint completedCheckpoint) {
} catch (IOException | CheckpointStorageException e) {
sneakyThrow(e);
}
LOG.info("pending checkpoint({}/{}@{}) notify finished!", completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId());
InvocationFuture<?>[] invocationFutures = notifyCheckpointCompleted(checkpointId);
CompletableFuture.allOf(invocationFutures).join();
// TODO: notifyCheckpointCompleted fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

import com.hazelcast.cluster.Address;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
Expand All @@ -67,8 +67,6 @@ public class CheckpointManager {

private final NodeEngine nodeEngine;

private final Map<Long, Address> subtaskWithAddresses;

/**
* key: the pipeline id of the job;
* <br> value: the checkpoint coordinator of the pipeline;
Expand All @@ -77,13 +75,15 @@ public class CheckpointManager {

private final CheckpointStorage checkpointStorage;

private final JobMaster jobMaster;
public CheckpointManager(long jobId,
NodeEngine nodeEngine,
JobMaster jobMaster,
Map<Integer, CheckpointPlan> checkpointPlanMap,
CheckpointConfig checkpointConfig) throws CheckpointStorageException {
this.jobId = jobId;
this.nodeEngine = nodeEngine;
this.subtaskWithAddresses = new HashMap<>();
this.jobMaster = jobMaster;
this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, checkpointConfig.getStorage().getStorage())
.create(new HashMap<>());
IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
Expand Down Expand Up @@ -131,6 +131,10 @@ public PassiveCompletableFuture<CompletedCheckpoint> triggerSavepoint(int pipeli
return getCheckpointCoordinator(pipelineId).startSavepoint();
}

public void reportedPipelineRunning(int pipelineId) {
getCheckpointCoordinator(pipelineId).tryTriggerPendingCheckpoint();
}

private CheckpointCoordinator getCheckpointCoordinator(TaskLocation taskLocation) {
return getCheckpointCoordinator(taskLocation.getPipelineId());
}
Expand All @@ -147,9 +151,9 @@ private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
* Called by the {@link Task}.
* <br> used by Task to report the {@link SeaTunnelTaskState} of the state machine.
*/
public void reportedTask(TaskReportStatusOperation reportStatusOperation, Address address) {
public void reportedTask(TaskReportStatusOperation reportStatusOperation) {
// task address may change during restore.
subtaskWithAddresses.put(reportStatusOperation.getLocation().getTaskID(), address);
log.debug("reported task({}) status{}", reportStatusOperation.getLocation().getTaskID(), reportStatusOperation.getStatus());
getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation);
}

Expand Down Expand Up @@ -211,6 +215,6 @@ public void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
}

protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, subtaskWithAddresses.get(operation.getTaskLocation().getTaskID()));
return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ public void run() {
((SeaTunnelServer) getService())
.getCoordinatorService().getJobMaster(location.getJobId())
.getCheckpointManager()
.reportedTask(this, getCallerAddress());
.reportedTask(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void init(long initializationTimestamp) throws Exception {
this.checkpointManager = new CheckpointManager(
jobImmutableInformation.getJobId(),
nodeEngine,
this,
planTuple.f1(),
checkpointConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException
CheckpointManager checkpointManager = new CheckpointManager(
jobId,
nodeEngine,
null,
planMap,
new CheckpointConfig());
Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
Expand Down

0 comments on commit 22d42e2

Please sign in to comment.