diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java index d1eaab5cc3da5..f00429ffee6ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java @@ -75,7 +75,7 @@ public interface TaskMailbox { int MIN_PRIORITY = -1; /** - * The maximal priority for mails. This priority indicates that the message should be performed + * The maximal priority for mails. This priority indicates that the mail should be performed * before any mail associated with an operator. */ int MAX_PRIORITY = Integer.MAX_VALUE; @@ -97,26 +97,32 @@ public interface TaskMailbox { boolean hasMail(); /** - * Returns an optional with either the oldest mail from the mailbox (head of queue) if the - * mailbox is not empty or an empty optional otherwise. + * Returns an optional with either the oldest mail from the mailbox if the mailbox is not empty + * or an empty optional otherwise. Note that the priority is given to retrieving email from the + * head of batch, and when email cannot be retrieved from the batch, it is retrieved from the + * head of queue. This also means that emails in batch are older than emails in queue. * *
Must be called from the mailbox thread ({@link #isMailboxThread()}.
*
* @return an optional with either the oldest mail from the mailbox (head of queue) if the
* mailbox is not empty or an empty optional otherwise.
- * @throws IllegalStateException if mailbox is already closed.
+ * @throws MailboxClosedException if mailbox is already closed.
+ * @throws IllegalStateException if called from non-mailbox thread.
*/
Optional Must be called from the mailbox thread ({@link #isMailboxThread()}.
*
* @return the oldest mail from the mailbox (head of queue).
* @throws InterruptedException on interruption.
- * @throws IllegalStateException if mailbox is already closed.
+ * @throws MailboxClosedException if mailbox is already closed.
+ * @throws IllegalStateException if called from non-mailbox thread.
*/
@Nonnull
Mail take(int priority) throws InterruptedException;
@@ -128,7 +134,7 @@ public interface TaskMailbox {
* not affect {@link #tryTake(int)} and {@link #take(int)}; that is, they return the same mails
* even if no batch had been created.
*
- * The default batch is empty. Thus, this method must be invoked once before {@link
+ * By default, the batch is empty. Thus, this method must be invoked once before {@link
* #tryTakeFromBatch()}.
*
* If a batch is not completely consumed by {@link #tryTakeFromBatch()}, its elements are
@@ -138,11 +144,12 @@ public interface TaskMailbox {
*
* @return true if there is at least one element in the batch; that is, if there is any mail at
* all at the time of the invocation.
+ * @throws IllegalStateException if called from non-mailbox thread.
*/
boolean createBatch();
/**
- * Returns an optional with either the oldest mail from the batch (head of queue) if the batch
+ * Returns an optional with either the oldest mail from the batch (head of batch) if the batch
* is not empty or an empty optional otherwise.
*
* Must be called from the mailbox thread ({@link #isMailboxThread()}.
@@ -150,9 +157,10 @@ public interface TaskMailbox {
* Note that there is no blocking {@code takeFromBatch} as batches can only be created and
* consumed from the mailbox thread.
*
- * @return an optional with either the oldest mail from the batch (head of queue) if the batch
+ * @return an optional with either the oldest mail from the batch (head of batch) if the batch
* is not empty or an empty optional otherwise.
* @throws MailboxClosedException if mailbox is already closed.
+ * @throws IllegalStateException if called from non-mailbox thread.
*/
Optional