diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java index 50035e0b74a69..5ecaa24385e41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java @@ -109,7 +109,7 @@ public CompletableFuture allocateSlot( taskManager.get().getTaskExecutorConnection().getTaskExecutorGateway(); final ResourceID resourceId = taskManager.get().getTaskExecutorConnection().getResourceID(); - LOG.debug( + LOG.info( "Starting allocation of slot {} from {} for job {} with resource profile {}.", allocationId, resourceId, @@ -198,7 +198,7 @@ public CompletableFuture allocateSlot( public void freeSlot(AllocationID allocationId) { Preconditions.checkNotNull(allocationId); checkStarted(); - LOG.debug("Freeing slot {}.", allocationId); + LOG.info("Freeing slot {}.", allocationId); final Optional slotOptional = taskManagerTracker.getAllocatedOrPendingSlot(allocationId); @@ -250,6 +250,7 @@ public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) { // the next slot report or the acknowledgement of the allocation request. if (!reportedAllocationIds.contains(slot.getAllocationId()) && slot.getState() == SlotState.ALLOCATED) { + LOG.info("Freeing slot {} by slot report.", slot.getAllocationId()); taskManagerTracker.notifySlotStatus( slot.getAllocationId(), slot.getJobId(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java index f632357ddca81..cc097a4c32ae8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java @@ -303,7 +303,7 @@ public boolean registerTaskManager( ResourceProfile totalResourceProfile, ResourceProfile defaultSlotResourceProfile) { checkInit(); - LOG.debug( + LOG.info( "Registering task executor {} under {} at the slot manager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID()); @@ -400,7 +400,7 @@ private Optional findMatchingPendingTaskManager( public boolean unregisterTaskManager(InstanceID instanceId, Exception cause) { checkInit(); - LOG.debug("Unregistering task executor {} from the slot manager.", instanceId); + LOG.info("Unregistering task executor {} from the slot manager.", instanceId); if (taskManagerTracker.getRegisteredTaskManager(instanceId).isPresent()) { Set allocatedSlots = @@ -705,7 +705,7 @@ private void releaseIdleTaskExecutorIfPossible(TaskManagerInfo taskManagerInfo) private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) { final FlinkException cause = new FlinkException("TaskManager exceeded the idle timeout."); - LOG.debug( + LOG.info( "Release TaskManager {} because it exceeded the idle timeout.", timedOutTaskManagerId); resourceActions.releaseResource(timedOutTaskManagerId, cause);