diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index a2dab3d48c3..1274a14f46b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -225,7 +225,7 @@ private void scheduleTriggerPendingCheckpoint(long delayMills) { scheduler.schedule(this::tryTriggerPendingCheckpoint, delayMills, TimeUnit.MILLISECONDS); } - private void tryTriggerPendingCheckpoint() { + protected void tryTriggerPendingCheckpoint() { synchronized (lock) { final long currentTimestamp = Instant.now().toEpochMilli(); if (currentTimestamp - latestTriggerTimestamp.get() >= coordinatorConfig.getCheckpointInterval() && @@ -427,6 +427,7 @@ public void completePendingCheckpoint(CompletedCheckpoint completedCheckpoint) { } catch (IOException | CheckpointStorageException e) { sneakyThrow(e); } + LOG.info("pending checkpoint({}/{}@{}) notify finished!", completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId()); InvocationFuture[] invocationFutures = notifyCheckpointCompleted(checkpointId); CompletableFuture.allOf(invocationFutures).join(); // TODO: notifyCheckpointCompleted fail diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index fd48f7e6553..21054d535c2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -38,11 +38,11 @@ import org.apache.seatunnel.engine.server.execution.TaskGroup; import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; import org.apache.seatunnel.engine.server.execution.TaskLocation; +import org.apache.seatunnel.engine.server.master.JobMaster; import org.apache.seatunnel.engine.server.task.operation.TaskOperation; import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; -import com.hazelcast.cluster.Address; import com.hazelcast.map.IMap; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; @@ -67,8 +67,6 @@ public class CheckpointManager { private final NodeEngine nodeEngine; - private final Map subtaskWithAddresses; - /** * key: the pipeline id of the job; *
value: the checkpoint coordinator of the pipeline; @@ -77,13 +75,15 @@ public class CheckpointManager { private final CheckpointStorage checkpointStorage; + private final JobMaster jobMaster; public CheckpointManager(long jobId, NodeEngine nodeEngine, + JobMaster jobMaster, Map checkpointPlanMap, CheckpointConfig checkpointConfig) throws CheckpointStorageException { this.jobId = jobId; this.nodeEngine = nodeEngine; - this.subtaskWithAddresses = new HashMap<>(); + this.jobMaster = jobMaster; this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, checkpointConfig.getStorage().getStorage()) .create(new HashMap<>()); IMap checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId)); @@ -131,6 +131,10 @@ public PassiveCompletableFuture triggerSavepoint(int pipeli return getCheckpointCoordinator(pipelineId).startSavepoint(); } + public void reportedPipelineRunning(int pipelineId) { + getCheckpointCoordinator(pipelineId).tryTriggerPendingCheckpoint(); + } + private CheckpointCoordinator getCheckpointCoordinator(TaskLocation taskLocation) { return getCheckpointCoordinator(taskLocation.getPipelineId()); } @@ -147,9 +151,9 @@ private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) { * Called by the {@link Task}. *
used by Task to report the {@link SeaTunnelTaskState} of the state machine. */ - public void reportedTask(TaskReportStatusOperation reportStatusOperation, Address address) { + public void reportedTask(TaskReportStatusOperation reportStatusOperation) { // task address may change during restore. - subtaskWithAddresses.put(reportStatusOperation.getLocation().getTaskID(), address); + log.debug("reported task({}) status{}", reportStatusOperation.getLocation().getTaskID(), reportStatusOperation.getStatus()); getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation); } @@ -211,6 +215,6 @@ public void acknowledgeTask(TaskAcknowledgeOperation ackOperation) { } protected InvocationFuture sendOperationToMemberNode(TaskOperation operation) { - return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, subtaskWithAddresses.get(operation.getTaskLocation().getTaskID())); + return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId())); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java index d5ea0d078cb..171353e6bdf 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java @@ -67,6 +67,6 @@ public void run() { ((SeaTunnelServer) getService()) .getCoordinatorService().getJobMaster(location.getJobId()) .getCheckpointManager() - .reportedTask(this, getCallerAddress()); + .reportedTask(this); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 2df8df9c403..83a2af5608e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -151,6 +151,7 @@ public void init(long initializationTimestamp) throws Exception { this.checkpointManager = new CheckpointManager( jobImmutableInformation.getJobId(), nodeEngine, + this, planTuple.f1(), checkpointConfig); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java index 0ee8bbd80a6..c281a15f550 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java @@ -64,6 +64,7 @@ public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException CheckpointManager checkpointManager = new CheckpointManager( jobId, nodeEngine, + null, planMap, new CheckpointConfig()); Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));