Skip to content

Commit

Permalink
Write/QueryWriteStatus logging refinement/addition
Browse files Browse the repository at this point in the history
Improve the logging of WriteRequests to include offset and finish_write
information. Offsets are logged for the initial and non-sequential per
successive write request. Each finish_write true request is logged with
the effective size of the resource at the completion of the write
request, including the current offset and payload. Clarified comments
for WriteDetails, and corrected some comment inconsistencies.

Add logging for QueryWriteStatus calls which occur on progressive writes
to determine an offset to begin a write call on a retry.

Closes bazelbuild#12928.

PiperOrigin-RevId: 355545331
  • Loading branch information
werkt authored and copybara-github committed Feb 4, 2021
1 parent 8fc7787 commit 32fc451
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/util/io",
"//src/main/protobuf:remote_execution_log_java_proto",
"//third_party:flogger",
"//third_party:guava",
"//third_party:jsr305",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public LoggingInterceptor(AsynchronousFileOutputStream rpcLogFile, Clock clock)
return new ReadHandler(); // <ReadRequest, ReadResponse>
} else if (method == ByteStreamGrpc.getWriteMethod()) {
return new WriteHandler(); // <WriteRequest, WriteResponse>
} else if (method == ByteStreamGrpc.getQueryWriteStatusMethod()) {
return new QueryWriteStatusHandler(); // <QueryWriteStatusRequest, QueryWriteStatusResponse>
} else if (method == CapabilitiesGrpc.getGetCapabilitiesMethod()) {
return new GetCapabilitiesHandler(); // <GetCapabilitiesRequest, ServerCapabilities>
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.remote.logging;

import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.QueryWriteStatusDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;

/** LoggingHandler for {@link google.bytestream.QueryWriteStatus} gRPC call. */
public class QueryWriteStatusHandler
implements LoggingHandler<QueryWriteStatusRequest, QueryWriteStatusResponse> {
private final QueryWriteStatusDetails.Builder builder = QueryWriteStatusDetails.newBuilder();

@Override
public void handleReq(QueryWriteStatusRequest message) {
builder.setRequest(message);
}

@Override
public void handleResp(QueryWriteStatusResponse message) {
builder.setResponse(message);
}

@Override
public RpcCallDetails getDetails() {
return RpcCallDetails.newBuilder().setQueryWriteStatus(builder).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,40 @@

import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

/** LoggingHandler for {@link google.bytestream.Write} gRPC call. */
public class WriteHandler implements LoggingHandler<WriteRequest, WriteResponse> {
private final WriteDetails.Builder builder = WriteDetails.newBuilder();
private final Set<String> resources = new LinkedHashSet<>();
private final List<Long> offsets = new ArrayList<>();
private final List<Long> finishWrites = new ArrayList<>();
private long bytesSentInSequence = 0;
private long numWrites = 0;
private long bytesSent = 0;

@Override
public void handleReq(WriteRequest message) {
resources.add(message.getResourceName());
long writeOffset = message.getWriteOffset();
if (numWrites == 0 || Iterables.getLast(offsets) + bytesSentInSequence != writeOffset) {
offsets.add(writeOffset);
bytesSentInSequence = 0;
}
int size = message.getData().size();
if (message.getFinishWrite()) {
finishWrites.add(writeOffset + size);
}

numWrites++;
bytesSent += message.getData().size();
bytesSent += size;
bytesSentInSequence += size;
}

@Override
Expand All @@ -44,6 +60,8 @@ public void handleResp(WriteResponse message) {
@Override
public RpcCallDetails getDetails() {
builder.addAllResourceNames(resources);
builder.addAllOffsets(offsets);
builder.addAllFinishWrites(finishWrites);
builder.setNumWrites(numWrites);
builder.setBytesSent(bytesSent);
return RpcCallDetails.newBuilder().setWrite(builder).build();
Expand Down
34 changes: 29 additions & 5 deletions src/main/protobuf/remote_execution_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ syntax = "proto3";

package remote_logging;

import "build/bazel/remote/execution/v2/remote_execution.proto";
import "google/protobuf/timestamp.proto";
import "google/bytestream/bytestream.proto";
import "google/longrunning/operations.proto";
import "google/protobuf/timestamp.proto";
import "google/rpc/status.proto";
import "build/bazel/remote/execution/v2/remote_execution.proto";

option java_package = "com.google.devtools.build.lib.remote.logging";

Expand Down Expand Up @@ -83,7 +83,7 @@ message GetActionResultDetails {
// Details for a call to
// build.bazel.remote.execution.v2.ActionCache.UpdateActionResult.
message UpdateActionResultDetails {
// The build.bazel.remote.execution.v2.GetActionResultRequest sent by
// The build.bazel.remote.execution.v2.UpdateActionResultRequest sent by
// the call.
build.bazel.remote.execution.v2.UpdateActionResultRequest request = 1;

Expand Down Expand Up @@ -128,10 +128,24 @@ message ReadDetails {
message WriteDetails {
// The names of resources requested to be written to in this call in the order
// they were first requested in. If the ByteStream protocol is followed
// according to specification, this should only contain have a single element,
// which is the resource name specified in the first message of the stream.
// according to specification, this should contain at most two elements:
// The resource name specified in the first message of the stream, and an
// empty string specified in each successive request if num_writes > 1.
repeated string resource_names = 1;

// The offsets sent for the initial request and any non-sequential offsets
// specified over the course of the call. If the ByteStream protocol is
// followed according to specification, this should contain a single element
// which is the starting point for the write call.
repeated int64 offsets = 5;

// The effective final size for each request sent with finish_write true
// specified over the course of the call. If the ByteStream protocol is
// followed according to specification, this should contain a single element
// which is the total size of the written resource, including the initial
// offset.
repeated int64 finish_writes = 6;

// The number of writes performed in this call.
int64 num_writes = 2;

Expand All @@ -142,6 +156,15 @@ message WriteDetails {
google.bytestream.WriteResponse response = 4;
}

// Details for a call to google.bytestream.QueryWriteStatus.
message QueryWriteStatusDetails {
// The google.bytestream.QueryWriteStatusRequest sent by the call.
google.bytestream.QueryWriteStatusRequest request = 1;

// The received google.bytestream.QueryWriteStatusResponse.
google.bytestream.QueryWriteStatusResponse response = 2;
}

// Contains details for specific types of calls.
message RpcCallDetails {
reserved 1 to 4, 11;
Expand All @@ -152,6 +175,7 @@ message RpcCallDetails {
FindMissingBlobsDetails find_missing_blobs = 10;
ReadDetails read = 5;
WriteDetails write = 6;
QueryWriteStatusDetails query_write_status = 14;
GetCapabilitiesDetails get_capabilities = 12;
UpdateActionResultDetails update_action_result = 13;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.bytestream.ByteStreamProto.WriteRequest;
Expand All @@ -58,6 +60,7 @@
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetActionResultDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetCapabilitiesDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.QueryWriteStatusDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ReadDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.UpdateActionResultDetails;
Expand Down Expand Up @@ -854,6 +857,10 @@ public void onCompleted() {
WriteDetails.newBuilder()
.addResourceNames("test1")
.addResourceNames("test2")
.addOffsets(0)
.addOffsets(0)
.addOffsets(0)
// finish write is empty
.setResponse(response)
.setBytesSent(9)
.setNumWrites(3)))
Expand All @@ -865,6 +872,81 @@ public void onCompleted() {
verify(logStream).write(expectedEntry);
}

@Test
public void testWriteCallOffsetAndFinishWriteCompounding() {
WriteRequest request1 =
WriteRequest.newBuilder()
.setResourceName("test1")
.setData(ByteString.copyFromUtf8("abc"))
.setWriteOffset(10)
.build();
WriteRequest request2 =
WriteRequest.newBuilder()
.setData(ByteString.copyFromUtf8("def"))
.setWriteOffset(request1.getWriteOffset() + request1.getData().size())
.build();
WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build();
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest writeRequest) {}

@Override
public void onError(Throwable throwable) {}

@Override
public void onCompleted() {
streamObserver.onNext(response);
streamObserver.onCompleted();
}
};
}
});
ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
@SuppressWarnings("unchecked")
StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);

clock.advanceMillis(10000);
// Request three writes, the first identical with the third, but offset correctly and
// finish_writing
StreamObserver<WriteRequest> requester = stub.write(responseObserver);
requester.onNext(request1);
clock.advanceMillis(100);
requester.onNext(request2);
clock.advanceMillis(200);
requester.onNext(
request1.toBuilder()
.setWriteOffset(request2.getWriteOffset() + request2.getData().size())
.setFinishWrite(true)
.build());
clock.advanceMillis(100);
requester.onCompleted();

LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setWrite(
WriteDetails.newBuilder()
.addResourceNames("test1")
.addResourceNames("")
.addOffsets(request1.getWriteOffset())
.addFinishWrites(
10 + request1.getData().size() * 2 + request2.getData().size())
.setResponse(response)
.setBytesSent(9)
.setNumWrites(3)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(10))
.setEndTime(Timestamp.newBuilder().setSeconds(10).setNanos(400000000))
.build();
verify(logStream).write(expectedEntry);
}

@Test
public void testWriteCallFail() {
WriteRequest request =
Expand All @@ -881,7 +963,6 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
return Mockito.mock(StreamObserver.class);
}
});

ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
@SuppressWarnings("unchecked")
StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
Expand All @@ -894,7 +975,6 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
requester.onError(error.asRuntimeException());

Status expectedCancel = Status.CANCELLED.withCause(error.asRuntimeException());

LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
Expand All @@ -907,12 +987,50 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
.setWrite(
WriteDetails.newBuilder()
.addResourceNames("test")
.addOffsets(0)
.setNumWrites(1)
.setBytesSent(3)))
.setStartTime(Timestamp.newBuilder().setSeconds(10000000))
.setEndTime(Timestamp.newBuilder().setSeconds(20000000))
.build();
verify(logStream).write(expectedEntry);
}

@Test
public void testQueryWriteStatusCallOk() {
QueryWriteStatusRequest request =
QueryWriteStatusRequest.newBuilder().setResourceName("test").build();
QueryWriteStatusResponse response =
QueryWriteStatusResponse.newBuilder().setCommittedSize(10).build();
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void queryWriteStatus(
QueryWriteStatusRequest request,
StreamObserver<QueryWriteStatusResponse> responseObserver) {
clock.advanceMillis(22222);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
});
ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(loggedChannel);

clock.advanceMillis(11111);
stub.queryWriteStatus(request);

LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getQueryWriteStatusMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setQueryWriteStatus(
QueryWriteStatusDetails.newBuilder()
.setRequest(request)
.setResponse(response)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
.setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
.build();
verify(logStream).write(expectedEntry);
}
}

0 comments on commit 32fc451

Please sign in to comment.