Skip to content

Commit

Permalink
Support remote execution and BES reporting via unix domain sockets.
Browse files Browse the repository at this point in the history
This extends the existing `--remote_proxy` flag to also apply to gRPC channels created for `--remote_executor` and `--remote_cache`. It also adds a `--bes_proxy` flag, which applies to gRPC channels created by BES.

Logic in `newNettyChannelBuilder()` was derived from the existing unix socket setup in `HttpBlobStore`.

Closes #9872.

PiperOrigin-RevId: 274561572
  • Loading branch information
jmillikin-stripe authored and copybara-github committed Oct 14, 2019
1 parent 6d1b972 commit d9fe1d4
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.grpc.CallCredentials;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.ssl.SslContext;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -42,8 +50,8 @@ public final class GoogleAuthUtils {
*
* @throws IOException in case the channel can't be constructed.
*/
public static ManagedChannel newChannel(String target, AuthAndTLSOptions options,
ClientInterceptor... interceptors)
public static ManagedChannel newChannel(
String target, String proxy, AuthAndTLSOptions options, ClientInterceptor... interceptors)
throws IOException {
Preconditions.checkNotNull(target);
Preconditions.checkNotNull(options);
Expand All @@ -56,10 +64,9 @@ public static ManagedChannel newChannel(String target, AuthAndTLSOptions options

try {
NettyChannelBuilder builder =
NettyChannelBuilder.forTarget(targetUrl)
newNettyChannelBuilder(targetUrl, proxy)
.negotiationType(
isTlsEnabled(target) ? NegotiationType.TLS : NegotiationType.PLAINTEXT)
.defaultLoadBalancingPolicy("round_robin")
.intercept(interceptors);
if (sslContext != null) {
builder.sslContext(sslContext);
Expand Down Expand Up @@ -112,6 +119,33 @@ private static SslContext createSSlContext(@Nullable String rootCert) throws IOE
}
}

private static NettyChannelBuilder newNettyChannelBuilder(String targetUrl, String proxy)
throws IOException {
if (Strings.isNullOrEmpty(proxy)) {
return NettyChannelBuilder.forTarget(targetUrl).defaultLoadBalancingPolicy("round_robin");
}

if (!proxy.startsWith("unix:")) {
throw new IOException("Remote proxy unsupported: " + proxy);
}

DomainSocketAddress address = new DomainSocketAddress(proxy.replaceFirst("^unix:", ""));
NettyChannelBuilder builder =
NettyChannelBuilder.forAddress(address).overrideAuthority(targetUrl);
if (KQueue.isAvailable()) {
return builder
.channelType(KQueueDomainSocketChannel.class)
.eventLoopGroup(new KQueueEventLoopGroup());
}
if (Epoll.isAvailable()) {
return builder
.channelType(EpollDomainSocketChannel.class)
.eventLoopGroup(new EpollEventLoopGroup());
}

throw new IOException("Unix domain sockets are unsupported on this platform");
}

/**
* Create a new {@link CallCredentials} object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ protected BuildEventServiceClient getBesClient(
@VisibleForTesting
protected ManagedChannel newGrpcChannel(
BuildEventServiceOptions besOptions, AuthAndTLSOptions authAndTLSOptions) throws IOException {
return GoogleAuthUtils.newChannel(besOptions.besBackend, authAndTLSOptions);
return GoogleAuthUtils.newChannel(
besOptions.besBackend, besOptions.besProxy, authAndTLSOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ public class BuildEventServiceOptions extends OptionsBase {
+ "or should end the invocation immediately and finish the upload in the background.")
public BesUploadMode besUploadMode;

@Option(
name = "bes_proxy",
defaultValue = "null",
documentationCategory = OptionDocumentationCategory.LOGGING,
effectTags = {OptionEffectTag.UNKNOWN},
help =
"Connect to the Build Event Service through a proxy. Currently this flag can only be"
+ " used to configure a Unix domain socket (unix:/path/to/socket).")
public String besProxy;

/** Determines the mode that will be used to upload data to the Build Event Service. */
public enum BesUploadMode {
/** Block at the end of the build waiting for the upload to complete */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
new ReferenceCountedChannel(
GoogleAuthUtils.newChannel(
remoteOptions.remoteExecutor,
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.toArray(new ClientInterceptor[0])));
}
Expand All @@ -218,6 +219,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
new ReferenceCountedChannel(
GoogleAuthUtils.newChannel(
remoteOptions.remoteCache,
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.toArray(new ClientInterceptor[0])));
} else { // Assume --remote_cache is equal to --remote_executor by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public final class RemoteOptions extends OptionsBase {
effectTags = {OptionEffectTag.UNKNOWN},
help =
"Connect to the remote cache through a proxy. Currently this flag can only be used to "
+ "configure a Unix domain socket (unix:/path/to/socket) for the HTTP cache.")
+ "configure a Unix domain socket (unix:/path/to/socket).")
public String remoteProxy;

@Option(
Expand Down
1 change: 1 addition & 0 deletions src/test/shell/bazel/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ sh_test(
timeout = "eternal",
srcs = ["remote_execution_test.sh"],
data = [
":uds_proxy.py",
"//src/test/shell/bazel:test-deps",
"//src/tools/remote:worker",
],
Expand Down
37 changes: 37 additions & 0 deletions src/test/shell/bazel/remote/remote_execution_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,43 @@ EOF
|| fail "Failed to build //a:foo with remote cache"
}

function test_remote_grpc_via_unix_socket() {
case "$PLATFORM" in
darwin|freebsd|linux)
;;
*)
return 0
;;
esac

# Test that remote execution can be routed via a UNIX domain socket if
# supported by the platform.
mkdir -p a
cat > a/BUILD <<EOF
genrule(
name = 'foo',
outs = ["foo.txt"],
cmd = "echo \"foo bar\" > \$@",
)
EOF

# Note: not using $TEST_TMPDIR because many OSes, notably macOS, have
# small maximum length limits for UNIX domain sockets.
socket_dir=$(mktemp -d -t "remote_executor.XXXXXXXX")
python "${CURRENT_DIR}/uds_proxy.py" "${socket_dir}/executor-socket" "localhost:${worker_port}" &
proxy_pid=$!

bazel build \
--remote_executor=grpc://noexist.invalid \
--remote_proxy="unix:${socket_dir}/executor-socket" \
//a:foo \
|| fail "Failed to build //a:foo with remote cache"

kill ${proxy_pid}
rm "${socket_dir}/executor-socket"
rmdir "${socket_dir}"
}

function test_cc_binary() {
if [[ "$PLATFORM" == "darwin" ]]; then
# TODO(b/37355380): This test is disabled due to RemoteWorker not supporting
Expand Down
63 changes: 63 additions & 0 deletions src/test/shell/bazel/remote/uds_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2019 The Bazel Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Simple replaying Unix Domain Socket (UDS) server."""

import contextlib
import select
import socket
import sys
import threading


def main(argv=None):
if argv is None:
argv = sys.argv[1:]

srv_address = argv[0]
dst_host, dst_port = argv[1].split(":")
dst_address = (dst_host, int(dst_port))

srv_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
srv_sock.bind(srv_address)
srv_sock.listen(64)
while True:
src_sock, _ = srv_sock.accept()
proxy_thread = threading.Thread(target=proxy, args=[src_sock, dst_address])
proxy_thread.daemon = True
proxy_thread.start()


def proxy(src_sock, dst_address):
"""Read data from a USD socket and write it back."""
with contextlib.closing(src_sock):
src_sock.settimeout(None)
dst_sock = socket.create_connection(dst_address)
with contextlib.closing(dst_sock):
dst_sock.settimeout(None)

while True:
readable, _, _ = select.select([src_sock, dst_sock], [], [])
if src_sock in readable:
data = src_sock.recv(4096)
if not data:
return
dst_sock.sendall(data)
if dst_sock in readable:
data = dst_sock.recv(4096)
if data:
src_sock.sendall(data)


if __name__ == "__main__":
sys.exit(main())

0 comments on commit d9fe1d4

Please sign in to comment.