Skip to content

Commit

Permalink
Add ifQueued callback to ConcurrencyMeter#request.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 675933644
Change-Id: I887247df85e8a84c4356fe0f4ff2ae98226b4c05
  • Loading branch information
coeuvre authored and copybara-github committed Sep 18, 2024
1 parent ae46abf commit 75f9799
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/main/java/com/google/devtools/build/lib/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/clock",
"//third_party:error_prone_annotations",
"//third_party:guava",
"//third_party:jsr305",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

/**
* A dispenser for up to 'total' simultaneous units of some resource. The resource itself is not
Expand Down Expand Up @@ -72,6 +73,10 @@ synchronized int queueSize() {
return queue.size();
}

public ListenableFuture<Ticket> request(long quantity, long priority) {
return request(quantity, priority, () -> {});
}

/**
* Enqueues a request for {@code quantity} units of the resource managed by this meter. When the
* request is filled, the result becomes available.
Expand All @@ -81,21 +86,52 @@ synchronized int queueSize() {
*
* @param quantity number of units of resources to acquire
* @param priority requests with greater priority complete earlier
* @param ifQueued a callback to be executed if the request is queued
* @return a future which grants resources only when it completes successfully
*/
public ListenableFuture<Ticket> request(long quantity, long priority) {
public ListenableFuture<Ticket> request(long quantity, long priority, Runnable ifQueued) {
checkArgument(quantity >= 0);
PendingJob job = new PendingJob(quantity, priority);
synchronized (this) {
queue.add(job);
ReleasingTicket ticket = maybeLease(job);
if (ticket != null) {
setTicket(job, ticket);
} else {
ifQueued.run();
synchronized (this) {
queue.add(job);
}
}
schedule();
return job.futureTicket;
}

@Nullable
private synchronized ReleasingTicket maybeLease(PendingJob job) {
if (leased + job.quantity > total && leased > 0) {
return null;
}

leased += job.quantity;

if (leased >= maxLeased) {
maxLeased = leased;
maxLeasedTimestamp = clock.now();
}

return new ReleasingTicket(job.quantity);
}

private void setTicket(PendingJob job, ReleasingTicket ticket) {
if (!job.futureTicket.set(ticket)) {
// The future may have been cancelled. Release immediately. If the build was interrupted, we
// may encounter a long chain of cancelled tickets - avoid calling ticket.done() or
// releaseAndSchedule() which would process them recursively.
release(job.quantity);
}
}

/** Statistics about a ConcurrencyMeter. */
public record Stats(
String name, long total, long leased, long maxLeased, long maxLeasedTimeMs) {}
public record Stats(String name, long total, long leased, long maxLeased, long maxLeasedTimeMs) {}

public synchronized Stats getStats() {
return new Stats(
Expand All @@ -115,29 +151,19 @@ private void releaseAndSchedule(long quantity) {
private void schedule() {
while (true) {
PendingJob job;
ReleasingTicket ticket;
synchronized (this) {
job = queue.peek();
if (job == null || (leased + job.quantity > total && leased > 0)) {
if (job == null || (ticket = maybeLease(job)) == null) {
return;
}
queue.remove();
leased += job.quantity;

if (leased >= maxLeased) {
maxLeased = leased;
maxLeasedTimestamp = clock.now();
}
}

// Set the future outside synchronized block to avoid holding the lock when executing future's
// callbacks which may hold other locks and call into ConcurrencyMeter causing deadlocks.
// See: b/319411390
if (!job.futureTicket.set(new ReleasingTicket(job.quantity))) {
// The future may have been cancelled. Release immediately. If the build was interrupted, we
// may encounter a long chain of cancelled tickets - avoid calling ticket.done() or
// releaseAndSchedule() which would process them recursively.
release(job.quantity);
}
setTicket(job, ticket);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.Future.State;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -46,30 +47,35 @@ private static void assertFutureIsSuccessful(Future<?> future) {
@Test
public void testGrant() throws Exception {
ConcurrencyMeter scheduler = new ConcurrencyMeter("meter", 3, BlazeClock.instance());
AtomicBoolean isQueued = new AtomicBoolean(false);

ListenableFuture<Ticket> req1 = scheduler.request(2, 0);
ListenableFuture<Ticket> req1 = scheduler.request(2, 0, () -> isQueued.set(true));
assertFutureIsSuccessful(req1);
assertThat(scheduler.queueSize()).isEqualTo(0);
assertThat(isQueued.get()).isFalse();
req1.get().done();

ListenableFuture<Ticket> req2 = scheduler.request(2, 0);
ListenableFuture<Ticket> req2 = scheduler.request(2, 0, () -> isQueued.set(true));
assertFutureIsSuccessful(req2);

ListenableFuture<Ticket> req3 = scheduler.request(1, 0);
ListenableFuture<Ticket> req3 = scheduler.request(1, 0, () -> isQueued.set(true));
assertFutureIsSuccessful(req3);
assertThat(isQueued.get()).isFalse();
assertThat(scheduler.queueSize()).isEqualTo(0);
}

@Test
public void testBlock() throws Exception {
ConcurrencyMeter scheduler = new ConcurrencyMeter("meter", 3, BlazeClock.instance());
AtomicBoolean isQueued = new AtomicBoolean(false);

ListenableFuture<Ticket> req1 = scheduler.request(2, 0);
ListenableFuture<Ticket> req1 = scheduler.request(2, 0, () -> isQueued.set(true));
assertFutureIsSuccessful(req1);

ListenableFuture<Ticket> req2 = scheduler.request(2, 0);
ListenableFuture<Ticket> req2 = scheduler.request(2, 0, () -> isQueued.set(true));
assertThat(req2.isDone()).isFalse();
assertThat(scheduler.queueSize()).isEqualTo(1);
assertThat(isQueued.get()).isTrue();

req1.get().done();
assertFutureIsSuccessful(req2);
Expand Down

0 comments on commit 75f9799

Please sign in to comment.