From 7d4be19cdcb03e619232d2f5f177f8d5611b6f7c Mon Sep 17 00:00:00 2001 From: beliefer Date: Wed, 16 Apr 2025 17:37:45 +0800 Subject: [PATCH] [FLINK-37689] Improve the comments and variable names for TaskMailbox --- .../runtime/tasks/mailbox/TaskMailbox.java | 26 +++++++---- .../tasks/mailbox/TaskMailboxImpl.java | 45 +++++++++---------- 2 files changed, 39 insertions(+), 32 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java index d1eaab5cc3da5..f00429ffee6ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java @@ -75,7 +75,7 @@ public interface TaskMailbox { int MIN_PRIORITY = -1; /** - * The maximal priority for mails. This priority indicates that the message should be performed + * The maximal priority for mails. This priority indicates that the mail should be performed * before any mail associated with an operator. */ int MAX_PRIORITY = Integer.MAX_VALUE; @@ -97,26 +97,32 @@ public interface TaskMailbox { boolean hasMail(); /** - * Returns an optional with either the oldest mail from the mailbox (head of queue) if the - * mailbox is not empty or an empty optional otherwise. + * Returns an optional with either the oldest mail from the mailbox if the mailbox is not empty + * or an empty optional otherwise. Note that the priority is given to retrieving email from the + * head of batch, and when email cannot be retrieved from the batch, it is retrieved from the + * head of queue. This also means that emails in batch are older than emails in queue. * *

Must be called from the mailbox thread ({@link #isMailboxThread()}. * * @return an optional with either the oldest mail from the mailbox (head of queue) if the * mailbox is not empty or an empty optional otherwise. - * @throws IllegalStateException if mailbox is already closed. + * @throws MailboxClosedException if mailbox is already closed. + * @throws IllegalStateException if called from non-mailbox thread. */ Optional tryTake(int priority); /** * This method returns the oldest mail from the mailbox (head of queue) or blocks until a mail - * is available. + * is available. Note that the priority is given to retrieving email from the head of batch, and + * when email cannot be retrieved from the batch, it is retrieved from the head of queue. This + * also means that emails in batch are older than emails in queue. * *

Must be called from the mailbox thread ({@link #isMailboxThread()}. * * @return the oldest mail from the mailbox (head of queue). * @throws InterruptedException on interruption. - * @throws IllegalStateException if mailbox is already closed. + * @throws MailboxClosedException if mailbox is already closed. + * @throws IllegalStateException if called from non-mailbox thread. */ @Nonnull Mail take(int priority) throws InterruptedException; @@ -128,7 +134,7 @@ public interface TaskMailbox { * not affect {@link #tryTake(int)} and {@link #take(int)}; that is, they return the same mails * even if no batch had been created. * - *

The default batch is empty. Thus, this method must be invoked once before {@link + *

By default, the batch is empty. Thus, this method must be invoked once before {@link * #tryTakeFromBatch()}. * *

If a batch is not completely consumed by {@link #tryTakeFromBatch()}, its elements are @@ -138,11 +144,12 @@ public interface TaskMailbox { * * @return true if there is at least one element in the batch; that is, if there is any mail at * all at the time of the invocation. + * @throws IllegalStateException if called from non-mailbox thread. */ boolean createBatch(); /** - * Returns an optional with either the oldest mail from the batch (head of queue) if the batch + * Returns an optional with either the oldest mail from the batch (head of batch) if the batch * is not empty or an empty optional otherwise. * *

Must be called from the mailbox thread ({@link #isMailboxThread()}. @@ -150,9 +157,10 @@ public interface TaskMailbox { *

Note that there is no blocking {@code takeFromBatch} as batches can only be created and * consumed from the mailbox thread. * - * @return an optional with either the oldest mail from the batch (head of queue) if the batch + * @return an optional with either the oldest mail from the batch (head of batch) if the batch * is not empty or an empty optional otherwise. * @throws MailboxClosedException if mailbox is already closed. + * @throws IllegalStateException if called from non-mailbox thread. */ Optional tryTakeFromBatch(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java index 00d19220e6e07..611a58c1faef1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java @@ -96,24 +96,13 @@ public boolean hasMail() { return !batch.isEmpty() || hasNewMail; } - @Override - public int size() { - final ReentrantLock lock = this.lock; - lock.lock(); - try { - return batch.size() + queue.size(); - } finally { - lock.unlock(); - } - } - @Override public Optional tryTake(int priority) { checkIsMailboxThread(); checkTakeStateConditions(); - Mail head = takeOrNull(batch, priority); - if (head != null) { - return Optional.of(head); + Mail headEmail = takeOrNull(batch, priority); + if (headEmail != null) { + return Optional.of(headEmail); } if (!hasNewMail) { return Optional.empty(); @@ -121,12 +110,12 @@ public Optional tryTake(int priority) { final ReentrantLock lock = this.lock; lock.lock(); try { - final Mail value = takeOrNull(queue, priority); - if (value == null) { + headEmail = takeOrNull(queue, priority); + if (headEmail == null) { return Optional.empty(); } hasNewMail = !queue.isEmpty(); - return Optional.ofNullable(value); + return Optional.of(headEmail); } finally { lock.unlock(); } @@ -136,20 +125,19 @@ public Optional tryTake(int priority) { public @Nonnull Mail take(int priority) throws InterruptedException, IllegalStateException { checkIsMailboxThread(); checkTakeStateConditions(); - Mail head = takeOrNull(batch, priority); - if (head != null) { - return head; + Mail headEmail = takeOrNull(batch, priority); + if (headEmail != null) { + return headEmail; } final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - Mail headMail; - while ((headMail = takeOrNull(queue, priority)) == null) { + while ((headEmail = takeOrNull(queue, priority)) == null) { // to ease debugging notEmpty.await(1, TimeUnit.SECONDS); } hasNewMail = !queue.isEmpty(); - return headMail; + return headEmail; } finally { lock.unlock(); } @@ -337,6 +325,17 @@ public State getState() { } } + @Override + public int size() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return batch.size() + queue.size(); + } finally { + lock.unlock(); + } + } + @Override public void runExclusively(Runnable runnable) { lock.lock();