Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public interface TaskMailbox {
int MIN_PRIORITY = -1;

/**
* The maximal priority for mails. This priority indicates that the message should be performed
* The maximal priority for mails. This priority indicates that the mail should be performed
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message -> mail. mail is a more suitable name in the context of TaskMailbox.

* before any mail associated with an operator.
*/
int MAX_PRIORITY = Integer.MAX_VALUE;
Expand All @@ -97,26 +97,32 @@ public interface TaskMailbox {
boolean hasMail();

/**
* Returns an optional with either the oldest mail from the mailbox (head of queue) if the
* mailbox is not empty or an empty optional otherwise.
* Returns an optional with either the oldest mail from the mailbox if the mailbox is not empty
* or an empty optional otherwise. Note that the priority is given to retrieving email from the
* head of batch, and when email cannot be retrieved from the batch, it is retrieved from the
* head of queue. This also means that emails in batch are older than emails in queue.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adds comments to increasing the readability of tryTake and take. These comments show below.
Note that the priority is given to retrieving email from the head of batch, and when email cannot be retrieved from the batch, it is retrieved from the head of queue. This also means that emails in batch are older than emails in queue.

*
* <p>Must be called from the mailbox thread ({@link #isMailboxThread()}.
*
* @return an optional with either the oldest mail from the mailbox (head of queue) if the
* mailbox is not empty or an empty optional otherwise.
* @throws IllegalStateException if mailbox is already closed.
* @throws MailboxClosedException if mailbox is already closed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supplement the missing exception into comments.

* @throws IllegalStateException if called from non-mailbox thread.
*/
Optional<Mail> tryTake(int priority);

/**
* This method returns the oldest mail from the mailbox (head of queue) or blocks until a mail
* is available.
* is available. Note that the priority is given to retrieving email from the head of batch, and
* when email cannot be retrieved from the batch, it is retrieved from the head of queue. This
* also means that emails in batch are older than emails in queue.
*
* <p>Must be called from the mailbox thread ({@link #isMailboxThread()}.
*
* @return the oldest mail from the mailbox (head of queue).
* @throws InterruptedException on interruption.
* @throws IllegalStateException if mailbox is already closed.
* @throws MailboxClosedException if mailbox is already closed.
* @throws IllegalStateException if called from non-mailbox thread.
*/
@Nonnull
Mail take(int priority) throws InterruptedException;
Expand All @@ -128,7 +134,7 @@ public interface TaskMailbox {
* not affect {@link #tryTake(int)} and {@link #take(int)}; that is, they return the same mails
* even if no batch had been created.
*
* <p>The default batch is empty. Thus, this method must be invoked once before {@link
* <p>By default, the batch is empty. Thus, this method must be invoked once before {@link
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default batch is empty. -> By default, the batch is empty.. The latter is more accurate.

* #tryTakeFromBatch()}.
*
* <p>If a batch is not completely consumed by {@link #tryTakeFromBatch()}, its elements are
Expand All @@ -138,21 +144,23 @@ public interface TaskMailbox {
*
* @return true if there is at least one element in the batch; that is, if there is any mail at
* all at the time of the invocation.
* @throws IllegalStateException if called from non-mailbox thread.
*/
boolean createBatch();

/**
* Returns an optional with either the oldest mail from the batch (head of queue) if the batch
* Returns an optional with either the oldest mail from the batch (head of batch) if the batch
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(head of queue) -> (head of batch). We only fetch the head mail from the batch not the queue with tryTakeFromBatch.

* is not empty or an empty optional otherwise.
*
* <p>Must be called from the mailbox thread ({@link #isMailboxThread()}.
*
* <p>Note that there is no blocking {@code takeFromBatch} as batches can only be created and
* consumed from the mailbox thread.
*
* @return an optional with either the oldest mail from the batch (head of queue) if the batch
* @return an optional with either the oldest mail from the batch (head of batch) if the batch
* is not empty or an empty optional otherwise.
* @throws MailboxClosedException if mailbox is already closed.
* @throws IllegalStateException if called from non-mailbox thread.
*/
Optional<Mail> tryTakeFromBatch();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,37 +96,26 @@ public boolean hasMail() {
return !batch.isEmpty() || hasNewMail;
}

@Override
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return batch.size() + queue.size();
} finally {
lock.unlock();
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the size() in TaskMailboxImpl so that it follows the order of the declaration in the interface TaskMailbox.

@Override
public Optional<Mail> tryTake(int priority) {
checkIsMailboxThread();
checkTakeStateConditions();
Mail head = takeOrNull(batch, priority);
if (head != null) {
return Optional.of(head);
Mail headEmail = takeOrNull(batch, priority);
if (headEmail != null) {
return Optional.of(headEmail);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

head -> headEmail. The latter is a better variable name. Some code uses headEmail and other uses head before this PR. We should unify the name.

}
if (!hasNewMail) {
return Optional.empty();
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Mail value = takeOrNull(queue, priority);
if (value == null) {
headEmail = takeOrNull(queue, priority);
if (headEmail == null) {
return Optional.empty();
}
hasNewMail = !queue.isEmpty();
return Optional.ofNullable(value);
return Optional.of(headEmail);
} finally {
lock.unlock();
}
Expand All @@ -136,20 +125,19 @@ public Optional<Mail> tryTake(int priority) {
public @Nonnull Mail take(int priority) throws InterruptedException, IllegalStateException {
checkIsMailboxThread();
checkTakeStateConditions();
Mail head = takeOrNull(batch, priority);
if (head != null) {
return head;
Mail headEmail = takeOrNull(batch, priority);
if (headEmail != null) {
return headEmail;
}
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
Mail headMail;
while ((headMail = takeOrNull(queue, priority)) == null) {
while ((headEmail = takeOrNull(queue, priority)) == null) {
// to ease debugging
notEmpty.await(1, TimeUnit.SECONDS);
}
hasNewMail = !queue.isEmpty();
return headMail;
return headEmail;
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -337,6 +325,17 @@ public State getState() {
}
}

@Override
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return batch.size() + queue.size();
} finally {
lock.unlock();
}
}

@Override
public void runExclusively(Runnable runnable) {
lock.lock();
Expand Down