Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Removed GrpcClientBuilder#MultiClientBuilder (#1809)" #2127

Merged
merged 2 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,43 @@ default HttpInitializer<U, R> append(HttpInitializer<U, R> toAppend) {
*/
<BlockingClient extends BlockingGrpcClient<?>> BlockingClient buildBlocking(
GrpcClientFactory<?, BlockingClient> clientFactory);

/**
* Returns a {@link MultiClientBuilder} to be used to create multiple clients that share the underlying transport.
* It is meant for a single backend that hosts different service APIs.
*
* @return A {@link MultiClientBuilder builder} that allows reusing underlying transport between
* <a href="https://www.grpc.io">gRPC</a> clients.
*/
MultiClientBuilder buildMulti();

/**
* An interface to create multiple <a href="https://www.grpc.io">gRPC</a> clients that share the underlying
* transport. It is meant for a single backend that hosts different service APIs.
*/
interface MultiClientBuilder {

/**
* Builds a <a href="https://www.grpc.io">gRPC</a> client.
*
* @param clientFactory {@link GrpcClientFactory} to use.
* @param <Client> <a href="https://www.grpc.io">gRPC</a> service that any client built
* from this factory represents.
*
* @return A <a href="https://www.grpc.io">gRPC</a> client.
*/
<Client extends GrpcClient<?>> Client build(GrpcClientFactory<Client, ?> clientFactory);

/**
* Builds a blocking <a href="https://www.grpc.io">gRPC</a> client.
*
* @param clientFactory {@link GrpcClientFactory} to use.
* @param <BlockingClient> Blocking <a href="https://www.grpc.io">gRPC</a> service that
* any client built from this builder represents.
*
* @return A blocking <a href="https://www.grpc.io">gRPC</a> client.
*/
<BlockingClient extends BlockingGrpcClient<?>> BlockingClient buildBlocking(
GrpcClientFactory<?, BlockingClient> clientFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,24 @@ public <BlockingClient extends BlockingGrpcClient<?>> BlockingClient buildBlocki
return clientFactory.newBlockingClientForCallFactory(newGrpcClientCallFactory());
}

@Override
public MultiClientBuilder buildMulti() {
final GrpcClientCallFactory callFactory = newGrpcClientCallFactory();

return new MultiClientBuilder() {
@Override
public <Client extends GrpcClient<?>> Client build(final GrpcClientFactory<Client, ?> clientFactory) {
return clientFactory.newClientForCallFactory(callFactory);
}

@Override
public <BlockingClient extends BlockingGrpcClient<?>> BlockingClient buildBlocking(
final GrpcClientFactory<?, BlockingClient> clientFactory) {
return clientFactory.newBlockingClientForCallFactory(callFactory);
}
};
}

private GrpcClientCallFactory newGrpcClientCallFactory() {
SingleAddressHttpClientBuilder<U, R> builder = httpClientBuilderSupplier.get().protocols(h2Default());
builder.appendClientFilter(CatchAllHttpClientFilter.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.grpc.netty;

import io.servicetalk.client.api.TransportObserverConnectionFactoryFilter;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.grpc.api.GrpcClientBuilder;
import io.servicetalk.grpc.api.GrpcServiceContext;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.netty.internal.NoopTransportObserver;

import io.grpc.examples.helloworld.Greeter;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

class MultiClientTest {

@Nullable
private ServerContext serverContext;
@Nullable
private GrpcClientBuilder<HostAndPort, InetSocketAddress> clientBuilder;

final CountingConnectionObserver countingObserver = new CountingConnectionObserver();

@BeforeEach
void setUp() throws Exception {
final TesterProto.Tester.TesterService service1 = new TesterProto.Tester.TesterService() {
@Override
public Publisher<TesterProto.TestResponse> testBiDiStream(
final GrpcServiceContext ctx, final Publisher<TesterProto.TestRequest> request) {
throw new UnsupportedOperationException();
}

@Override
public Single<TesterProto.TestResponse> testRequestStream(
final GrpcServiceContext ctx, final Publisher<TesterProto.TestRequest> request) {
throw new UnsupportedOperationException();
}

@Override
public Publisher<TesterProto.TestResponse> testResponseStream(
final GrpcServiceContext ctx, final TesterProto.TestRequest request) {
throw new UnsupportedOperationException();
}

@Override
public Single<TesterProto.TestResponse> test(
final GrpcServiceContext ctx, final TesterProto.TestRequest request) {
return Single.fromSupplier(() -> TesterProto.TestResponse.newBuilder().setMessage("response").build());
}
};
final Greeter.GreeterService service2 = new Greeter.GreeterService() {
@Override
public Single<HelloReply> sayHello(final GrpcServiceContext ctx, final HelloRequest request) {
return Single.fromSupplier(() -> HelloReply.newBuilder().setMessage("response").build());
}
};
serverContext = GrpcServers.forAddress(localAddress(0)).listenAndAwait(service1, service2);

countingObserver.count.set(0);

clientBuilder = GrpcClients.forAddress(serverHostAndPort(serverContext)).initializeHttp(builder -> builder
.appendConnectionFactoryFilter(new TransportObserverConnectionFactoryFilter<>(countingObserver))
);
}

@Test
void underlyingTransportIsSharedBetweenClients() throws Exception {
final GrpcClientBuilder.MultiClientBuilder multiClientBuilder = clientBuilder.buildMulti();

final TesterProto.Tester.BlockingTesterClient testerClient =
multiClientBuilder.buildBlocking(new TesterProto.Tester.ClientFactory());
final Greeter.BlockingGreeterClient greeterClient =
multiClientBuilder.buildBlocking(new Greeter.ClientFactory());

testerClient.test(TesterProto.TestRequest.newBuilder().build());
greeterClient.sayHello(HelloRequest.newBuilder().build());

assertThat(countingObserver.count.get(), equalTo(1));
}

private static class CountingConnectionObserver implements TransportObserver {

private final AtomicInteger count = new AtomicInteger();

@Override
public ConnectionObserver onNewConnection(@Nullable final Object localAddress, final Object remoteAddress) {
count.incrementAndGet();
return NoopTransportObserver.NoopConnectionObserver.INSTANCE;
}
}
}