Skip to content

Commit

Permalink
Remote: gRPC load balancing. (Part 2)
Browse files Browse the repository at this point in the history
Add a TokenBucket which is used later for rate limiting in connection pool.

PiperOrigin-RevId: 357946500
  • Loading branch information
Googler authored and copybara-github committed Feb 17, 2021
1 parent af3a556 commit e2b9a42
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ java_library(
name = "grpc",
srcs = glob(["*.java"]),
deps = [
"//src/main/java/com/google/devtools/build/lib/concurrent",
"//third_party:guava",
"//third_party:rxjava3",
"//third_party/grpc:grpc-jar",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.grpc;

import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import io.reactivex.rxjava3.core.Single;

/**
Expand All @@ -25,7 +26,10 @@
*
* <p>Connection creation must be cancellable. Canceling connection creation must release (“close”)
* the connection and all associated resources.
*
* <p>Implementations must be thread-safe.
*/
@ThreadSafe
public interface ConnectionFactory {
/** Creates a new {@link Connection}. */
Single<? extends Connection> create();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2021 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.grpc;

import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedDeque;

/** A container for tokens which is used for rate limiting. */
@ThreadSafe
public class TokenBucket<T> implements Closeable {
private final ConcurrentLinkedDeque<T> tokens;
private final BehaviorSubject<T> tokenBehaviorSubject;

public TokenBucket() {
this(ImmutableList.of());
}

public TokenBucket(Collection<T> initialTokens) {
tokens = new ConcurrentLinkedDeque<>(initialTokens);
tokenBehaviorSubject = BehaviorSubject.create();

if (!tokens.isEmpty()) {
tokenBehaviorSubject.onNext(tokens.getFirst());
}
}

/** Add a token to the bucket. */
public void addToken(T token) {
tokens.addLast(token);
tokenBehaviorSubject.onNext(token);
}

/** Returns current number of tokens in the bucket. */
public int size() {
return tokens.size();
}

/**
* Returns a cold {@link Single} which will start the token acquisition process upon subscription.
*/
public Single<T> acquireToken() {
return Single.create(
downstream ->
tokenBehaviorSubject.subscribe(
new Observer<T>() {
Disposable upstream;

@Override
public void onSubscribe(@NonNull Disposable d) {
upstream = d;
downstream.setDisposable(d);
}

@Override
public void onNext(@NonNull T ignored) {
if (!downstream.isDisposed()) {
T token = tokens.pollFirst();
if (token != null) {
downstream.onSuccess(token);
}
}
}

@Override
public void onError(@NonNull Throwable e) {
downstream.onError(new IllegalStateException(e));
}

@Override
public void onComplete() {
if (!downstream.isDisposed()) {
downstream.onError(new IllegalStateException("closed"));
}
}
}));
}

/**
* Closes the bucket and release all the tokens.
*
* <p>Subscriptions after closed to the Single returned by {@link TokenBucket#acquireToken()} will
* emit error.
*/
@Override
public void close() throws IOException {
tokens.clear();
tokenBehaviorSubject.onComplete();
}
}
1 change: 1 addition & 0 deletions src/test/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ filegroup(
srcs = glob(["**"]) + [
"//src/test/java/com/google/devtools/build/lib/remote/downloader:srcs",
"//src/test/java/com/google/devtools/build/lib/remote/http:srcs",
"//src/test/java/com/google/devtools/build/lib/remote/grpc:srcs",
"//src/test/java/com/google/devtools/build/lib/remote/logging:srcs",
"//src/test/java/com/google/devtools/build/lib/remote/merkletree:srcs",
"//src/test/java/com/google/devtools/build/lib/remote/options:srcs",
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/com/google/devtools/build/lib/remote/grpc/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
load("@rules_java//java:defs.bzl", "java_test")

package(
default_testonly = 1,
default_visibility = ["//src:__subpackages__"],
)

filegroup(
name = "srcs",
testonly = 0,
srcs = glob(["**"]),
visibility = ["//src/test/java/com/google/devtools/build/lib/remote:__pkg__"],
)

java_test(
name = "grpc",
srcs = glob(["*.java"]),
tags = [
"requires-network",
],
test_class = "com.google.devtools.build.lib.AllTests",
deps = [
"//src/main/java/com/google/devtools/build/lib/remote/grpc",
"//src/test/java/com/google/devtools/build/lib:test_runner",
"//third_party:guava",
"//third_party:junit4",
"//third_party:mockito",
"//third_party:rxjava3",
"//third_party:truth",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Copyright 2021 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.grpc;

import static com.google.common.truth.Truth.assertThat;

import com.google.common.collect.ImmutableList;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.observers.TestObserver;
import java.io.IOException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link TokenBucket} */
@RunWith(JUnit4.class)
public class TokenBucketTest {

@Test
public void acquireToken_smoke() {
TokenBucket<Integer> bucket = new TokenBucket<>();
assertThat(bucket.size()).isEqualTo(0);
bucket.addToken(0);
assertThat(bucket.size()).isEqualTo(1);

TestObserver<Integer> observer = bucket.acquireToken().test();

observer.assertValue(0).assertComplete();
assertThat(bucket.size()).isEqualTo(0);
}

@Test
public void acquireToken_releaseInitialTokens() {
TokenBucket<Integer> bucket = new TokenBucket<>(ImmutableList.of(0));
assertThat(bucket.size()).isEqualTo(1);

TestObserver<Integer> observer = bucket.acquireToken().test();

observer.assertValue(0).assertComplete();
assertThat(bucket.size()).isEqualTo(0);
}

@Test
public void acquireToken_multipleInitialTokens_releaseFirstToken() {
TokenBucket<Integer> bucket = new TokenBucket<>(ImmutableList.of(0, 1));
assertThat(bucket.size()).isEqualTo(2);

TestObserver<Integer> observer = bucket.acquireToken().test();

observer.assertValue(0).assertComplete();
assertThat(bucket.size()).isEqualTo(1);
}

@Test
public void acquireToken_multipleInitialTokens_releaseSecondToken() {
TokenBucket<Integer> bucket = new TokenBucket<>(ImmutableList.of(0, 1));
assertThat(bucket.size()).isEqualTo(2);
bucket.acquireToken().test().assertValue(0).assertComplete();

TestObserver<Integer> observer = bucket.acquireToken().test();

observer.assertValue(1).assertComplete();
assertThat(bucket.size()).isEqualTo(0);
}

@Test
public void acquireToken_releaseTokenToPreviousObserver() {
TokenBucket<Integer> bucket = new TokenBucket<>();
TestObserver<Integer> observer = bucket.acquireToken().test();
observer.assertEmpty();

bucket.addToken(0);

observer.assertValue(0).assertComplete();
assertThat(bucket.size()).isEqualTo(0);
}

@Test
public void acquireToken_notReleaseTokenToDisposedObserver() {
TokenBucket<Integer> bucket = new TokenBucket<>();
TestObserver<Integer> observer = bucket.acquireToken().test();

observer.dispose();
bucket.addToken(0);

observer.assertEmpty();
assertThat(bucket.size()).isEqualTo(1);
}

@Test
public void acquireToken_disposeAfterTokenAcquired() {
TokenBucket<Integer> bucket = new TokenBucket<>();
TestObserver<Integer> observer = bucket.acquireToken().test();

bucket.addToken(0);
bucket.addToken(1);

observer.assertValue(0).assertComplete();
assertThat(bucket.size()).isEqualTo(1);
}

@Test
public void acquireToken_multipleObservers_onlyOneCanAcquire() {
TokenBucket<Integer> bucket = new TokenBucket<>();
TestObserver<Integer> observer1 = bucket.acquireToken().test();
TestObserver<Integer> observer2 = bucket.acquireToken().test();

bucket.addToken(0);

if (!observer1.values().isEmpty()) {
observer1.assertValue(0).assertComplete();
observer2.assertEmpty();

bucket.addToken(1);
observer2.assertValue(1).assertComplete();
} else {
observer1.assertEmpty();
observer2.assertValue(0).assertComplete();

bucket.addToken(1);
observer1.assertValue(1).assertComplete();
}
}

@Test
public void acquireToken_reSubscription_waitAvailableToken() {
TokenBucket<Integer> bucket = new TokenBucket<>();
bucket.addToken(0);
Single<Integer> tokenSingle = bucket.acquireToken();

TestObserver<Integer> observer1 = tokenSingle.test();
TestObserver<Integer> observer2 = tokenSingle.test();

observer1.assertValue(0).assertComplete();
observer2.assertEmpty();
}

@Test
public void acquireToken_reSubscription_acquireNewToken() {
TokenBucket<Integer> bucket = new TokenBucket<>();
bucket.addToken(0);
Single<Integer> tokenSingle = bucket.acquireToken();
TestObserver<Integer> observer1 = tokenSingle.test();
TestObserver<Integer> observer2 = tokenSingle.test();

bucket.addToken(1);

observer1.assertValue(0).assertComplete();
observer2.assertValue(1).assertComplete();
}

@Test
public void acquireToken_reSubscription_acquireNextToken() {
TokenBucket<Integer> bucket = new TokenBucket<>();
bucket.addToken(0);
bucket.addToken(1);
Single<Integer> tokenSingle = bucket.acquireToken();

TestObserver<Integer> observer1 = tokenSingle.test();
TestObserver<Integer> observer2 = tokenSingle.test();

observer1.assertValue(0).assertComplete();
observer2.assertValue(1).assertComplete();
}

@Test
public void acquireToken_disposed_tokenRemains() {
TokenBucket<Integer> bucket = new TokenBucket<>();
TestObserver<Integer> observer = bucket.acquireToken().test();
observer.assertEmpty();

observer.dispose();
bucket.addToken(0);

assertThat(bucket.size()).isEqualTo(1);
}

@Test
public void close_errorAfterClose() throws IOException {
TokenBucket<Integer> bucket = new TokenBucket<>();
bucket.addToken(0);
bucket.close();

TestObserver<Integer> observer = bucket.acquireToken().test();

observer.assertError(
e -> e instanceof IllegalStateException && e.getMessage().contains("closed"));
}

@Test
public void close_errorPreviousObservers() throws IOException {
TokenBucket<Integer> bucket = new TokenBucket<>();
TestObserver<Integer> observer = bucket.acquireToken().test();

bucket.close();

observer.assertError(
e -> e instanceof IllegalStateException && e.getMessage().contains("closed"));
}
}

0 comments on commit e2b9a42

Please sign in to comment.