Skip to content

Commit dc27e79

Browse files
authored
Add the WriteBlob bidirectional streaming RPC to the write service. (#3226)
1 parent a0ce0e7 commit dc27e79

File tree

3 files changed

+168
-0
lines changed

3 files changed

+168
-0
lines changed

tensorboard/uploader/proto/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ exports_files(["LICENSE"])
1010
tb_proto_library(
1111
name = "protos_all",
1212
srcs = [
13+
"blob.proto",
1314
"export_service.proto",
1415
"scalar.proto",
1516
"server_info.proto",
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
syntax = "proto3";
2+
3+
package tensorboard.service;
4+
5+
enum BlobState {
6+
// Object state is unknown. This value should never be used; it is present
7+
// only as a proto3 best practice.
8+
// See https://developers.google.com/protocol-buffers/docs/proto3#enum
9+
UNKNOWN = 0;
10+
// Object is being written and not yet finalized.
11+
UNFINALIZED = 1;
12+
// Object is finalized.
13+
CURRENT = 2;
14+
}

tensorboard/uploader/proto/write_service.proto

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package tensorboard.service;
88
import "tensorboard/uploader/proto/export_service.proto";
99
import "tensorboard/uploader/proto/scalar.proto";
1010
import "tensorboard/uploader/proto/tensor.proto";
11+
import "tensorboard/uploader/proto/blob.proto";
1112
import "tensorboard/compat/proto/summary.proto";
1213

1314
// Service for writing data to TensorBoard.dev.
@@ -30,6 +31,16 @@ service TensorBoardWriterService {
3031
rpc WriteScalar(WriteScalarRequest) returns (WriteScalarResponse) {}
3132
// Request additional tensor data be stored in TensorBoard.dev.
3233
rpc WriteTensor(WriteTensorRequest) returns (WriteTensorResponse) {}
34+
// Request to obtain a specific BlobSequence entry, creating it if needed,
35+
// to be subsequently populated with blobs.
36+
rpc GetOrCreateBlobSequence(GetOrCreateBlobSequenceRequest)
37+
returns (GetOrCreateBlobSequenceResponse) {}
38+
// Request the current status of blob data being stored in TensorBoard.dev,
39+
// to support resumable uploads.
40+
rpc GetBlobMetadata(GetBlobMetadataRequest)
41+
returns (GetBlobMetadataResponse) {}
42+
// Request additional blob data be stored in TensorBoard.dev.
43+
rpc WriteBlob(stream WriteBlobRequest) returns (stream WriteBlobResponse) {}
3344
// Request that the calling user and all their data be permanently deleted.
3445
// Used for testing purposes.
3546
rpc DeleteOwnUser(DeleteOwnUserRequest) returns (DeleteOwnUserResponse) {}
@@ -190,6 +201,148 @@ message WriteTensorResponse {
190201
// This is empty on purpose.
191202
}
192203

204+
// Writing a blob sequence
205+
// =======================
206+
// * The caller requests creation of a new BlobSequence, via the
207+
// GetOrCreateBlobSequence RPC. The response provides a new BlobSequence ID.
208+
// * The caller writes each blob in the sequence separately via the WriteBlob
209+
// RPC as described below, providing the blob sequence ID and sequence index
210+
// for each blob.
211+
// * Multiple WriteBlob streams may operate in parallel (for different blobs),
212+
// with no constraint on their ordering.
213+
214+
// Resuming a partially written blob sequence
215+
// ==========================================
216+
// * The client starts the upload process with a new GetOrCreateBlobSequence
217+
// RPC.
218+
// * The blob sequences are uniquely keyed by experiment, run, tag, and step.
219+
// Since this is a second (or later) attempt regarding an existing sequence,
220+
// a new sequence is not created. Instead the existing sequence ID is
221+
// returned.
222+
// * The client attempts to upload each blob in the sequence. These uploads
223+
// resume or may be skipped entirely, as needed, according to the below
224+
// procedure.
225+
226+
// Writing a blob
227+
// ==============
228+
// * The caller instantiates a WriteBlob RPC, which is a bidirectional stream.
229+
// * The caller sends one or more WriteBlobRequests, providing the blob data
230+
// in an ordered sequence of chunks.
231+
// * The caller receives a sequence of WriteBlobResponses in return. Each
232+
// WriteBlobResponse corresponds to one WriteBlobRequest in order, reporting
233+
// the state of the write operation.
234+
// * The caller should not wait to verify that each response reports the
235+
// expected status and size before sending the next request. Doing so reduces
236+
// throughput, with little benefit since any mismatch on the server side will
237+
// cause the RPC to throw an error anyway.
238+
// * Nonetheless the caller may wish to validate these values, if provided,
239+
// whenever the response does arrive.
240+
// * The request size should be less than 4 MiB (2^20 = 4,194,304), because
241+
// that is the default request limit for gRPC. Accounting for the other
242+
// fields of the request, then, and leaving some padding for safety, the data
243+
// chunk should be somewhat smaller-- perhaps simply 4e6 bytes.
244+
// * The last of these requests must set finalize_object=True, and the prior
245+
// ones must not.
246+
// * For a blob <= 4MB, a single request suffices, so it is the last chunk.
247+
248+
// Resuming a partial blob upload
249+
// ==============================
250+
// * When the client begins an upload by the usual method above, it may turn
251+
// out that the data chunk is redundant with a previous partial upload.
252+
// The serve will raise an error in this case, closing the stream.
253+
// * At this point the client should issue a GetBlobMetadataRequest. The
254+
// GetBlobMetadataResponse reports the state, size, and crc32c of the object
255+
// so far, i.e. taking into account any previously uploaded data.
256+
// * The client checks whether any data remains to upload, and optionally
257+
// validates the cumulative crc32c against the local file (up to the given
258+
// offset).
259+
// * If needed, the client starts a new WriteBlob stream, starting from the
260+
// specified offset.
261+
// * The client may choose to call GetBlobMetadata before starting any
262+
// WriteBlob stream, to avoid the potential for wasted transfer of a
263+
// redundant chunk, at the cost of the initial roundtrip to obtain the
264+
// status. It is up to the client to weigh this tradeoff.
265+
266+
// Obtain a unique ID for a blob sequence, given the composite key
267+
// (experiment_id, run, tag, step). If such a blob sequence already exists,
268+
// return its ID. If not, create it first, and return the new ID.
269+
message GetOrCreateBlobSequenceRequest {
270+
// Service-wide unique identifier of an uploaded log dir.
271+
// eg: "1r9d0kQkh2laODSZcQXWP"
272+
string experiment_id = 1;
273+
// The name of the run to which the blob sequence belongs, for example
274+
// "/some/path/mnist_experiments/run1/".
275+
string run = 2;
276+
// The name of the tag to which the blob sequence belongs, for example "loss".
277+
string tag = 3;
278+
// Step index within the run.
279+
int64 step = 4;
280+
// The total number of elements expected in the sequence.
281+
// This effectively delares a number of initially empty 'upload slots',
282+
// to be filled with subsequent WriteBlob RPCs.
283+
int64 final_sequence_length = 5;
284+
// Note that metadata.plugin_data.content does not carry the payload.
285+
.tensorboard.SummaryMetadata metadata = 6;
286+
}
287+
288+
message GetOrCreateBlobSequenceResponse {
289+
// A unique ID for the the requested blob sequence.
290+
string blob_sequence_id = 1;
291+
}
292+
293+
message GetBlobMetadataRequest {
294+
// The ID of the BlobSequence of which this blob is a member.
295+
string blob_sequence_id = 1;
296+
// The position of this Blob within the BlobSequence
297+
int64 index = 2;
298+
}
299+
300+
message GetBlobMetadataResponse {
301+
// State of the object (still appending vs. complete).
302+
BlobState blob_state = 1;
303+
// Size of the object in bytes. In the case of a partial upload, this
304+
// reflects only the data actually received so far.
305+
int64 size = 2;
306+
// crc32c of the blob data stored so far, i.e. over the byte range [0, size).
307+
fixed32 crc32c = 3;
308+
}
309+
310+
// A single chunk of the blob write stream.
311+
//
312+
// Note that the WriteBlobRequest does not mirror the nested structure of
313+
// WriteScalarRequest, because we only ever send one blob at a time.
314+
message WriteBlobRequest {
315+
// The ID of the BlobSequence of which this blob is a member.
316+
string blob_sequence_id = 1;
317+
// The position of this Blob within the BlobSequence
318+
int64 index = 2;
319+
// The bytes in this chunk.
320+
bytes data = 3;
321+
// The position in the blob where this chunk begins.
322+
// This must equal the sum of the sizes of the chunks sent so far. Ignored
323+
// if no data is provided.
324+
int64 offset = 4;
325+
// CRC32C of current data buffer. Clients must include the crc32c for every
326+
// data buffer, to protect against data corruption. Note that for multi-shot
327+
// writes, specifying the crc32c for every data buffer provides stronger
328+
// protection than just providing the final_crc32c at the end of the upload.
329+
fixed32 crc32c = 5;
330+
// Indicates that this is the last chunk of the stream.
331+
bool finalize_object = 6;
332+
// CRC32C of the entire blob. Required, to protect against data corruption.
333+
// This should be set only when finalize_object=True.
334+
fixed32 final_crc32c = 7;
335+
}
336+
337+
message WriteBlobResponse {
338+
// State of the object (still appending vs. complete).
339+
BlobState blob_state = 1;
340+
// Size of the object in bytes. This is the sum of the chunk sizes
341+
// received from the stream so far. In the response to the final chunk,
342+
// this size should equal the total size of the blob.
343+
int64 size = 2;
344+
}
345+
193346
// Requests that the calling user and all their data be permanently deleted.
194347
message DeleteOwnUserRequest {
195348
// This is empty on purpose.

0 commit comments

Comments
 (0)